View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.regionserver;
20  
21  import java.io.IOException;
22  import java.io.InterruptedIOException;
23  import java.net.InetSocketAddress;
24  import java.net.UnknownHostException;
25  import java.util.ArrayList;
26  import java.util.Collections;
27  import java.util.HashMap;
28  import java.util.Iterator;
29  import java.util.List;
30  import java.util.Map;
31  import java.util.NavigableMap;
32  import java.util.Set;
33  import java.util.TreeSet;
34  import java.util.concurrent.ConcurrentHashMap;
35  import java.util.concurrent.atomic.AtomicLong;
36  
37  import org.apache.commons.logging.Log;
38  import org.apache.commons.logging.LogFactory;
39  import org.apache.hadoop.conf.Configuration;
40  import org.apache.hadoop.hbase.Cell;
41  import org.apache.hadoop.hbase.CellScannable;
42  import org.apache.hadoop.hbase.CellScanner;
43  import org.apache.hadoop.hbase.CellUtil;
44  import org.apache.hadoop.hbase.DoNotRetryIOException;
45  import org.apache.hadoop.hbase.DroppedSnapshotException;
46  import org.apache.hadoop.hbase.HBaseIOException;
47  import org.apache.hadoop.hbase.HConstants;
48  import org.apache.hadoop.hbase.HRegionInfo;
49  import org.apache.hadoop.hbase.HTableDescriptor;
50  import org.apache.hadoop.hbase.MetaTableAccessor;
51  import org.apache.hadoop.hbase.NotServingRegionException;
52  import org.apache.hadoop.hbase.ServerName;
53  import org.apache.hadoop.hbase.TableName;
54  import org.apache.hadoop.hbase.UnknownScannerException;
55  import org.apache.hadoop.hbase.classification.InterfaceAudience;
56  import org.apache.hadoop.hbase.client.Append;
57  import org.apache.hadoop.hbase.client.ConnectionUtils;
58  import org.apache.hadoop.hbase.client.Delete;
59  import org.apache.hadoop.hbase.client.Durability;
60  import org.apache.hadoop.hbase.client.Get;
61  import org.apache.hadoop.hbase.client.Increment;
62  import org.apache.hadoop.hbase.client.Mutation;
63  import org.apache.hadoop.hbase.client.Put;
64  import org.apache.hadoop.hbase.client.RegionReplicaUtil;
65  import org.apache.hadoop.hbase.client.Result;
66  import org.apache.hadoop.hbase.client.RowMutations;
67  import org.apache.hadoop.hbase.client.Scan;
68  import org.apache.hadoop.hbase.client.VersionInfoUtil;
69  import org.apache.hadoop.hbase.coordination.CloseRegionCoordination;
70  import org.apache.hadoop.hbase.coordination.OpenRegionCoordination;
71  import org.apache.hadoop.hbase.exceptions.FailedSanityCheckException;
72  import org.apache.hadoop.hbase.exceptions.MergeRegionException;
73  import org.apache.hadoop.hbase.exceptions.OperationConflictException;
74  import org.apache.hadoop.hbase.exceptions.OutOfOrderScannerNextException;
75  import org.apache.hadoop.hbase.exceptions.ScannerResetException;
76  import org.apache.hadoop.hbase.filter.ByteArrayComparable;
77  import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
78  import org.apache.hadoop.hbase.ipc.HBaseRPCErrorHandler;
79  import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
80  import org.apache.hadoop.hbase.ipc.PriorityFunction;
81  import org.apache.hadoop.hbase.ipc.QosPriority;
82  import org.apache.hadoop.hbase.ipc.RpcCallContext;
83  import org.apache.hadoop.hbase.ipc.RpcServer;
84  import org.apache.hadoop.hbase.ipc.RpcServer.BlockingServiceAndInterface;
85  import org.apache.hadoop.hbase.ipc.RpcServerInterface;
86  import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
87  import org.apache.hadoop.hbase.ipc.ServerRpcController;
88  import org.apache.hadoop.hbase.master.MasterRpcServices;
89  import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
90  import org.apache.hadoop.hbase.protobuf.RequestConverter;
91  import org.apache.hadoop.hbase.protobuf.ResponseConverter;
92  import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService;
93  import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.CloseRegionRequest;
94  import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.CloseRegionResponse;
95  import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.CompactRegionRequest;
96  import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.CompactRegionResponse;
97  import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.FlushRegionRequest;
98  import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.FlushRegionResponse;
99  import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetOnlineRegionRequest;
100 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetOnlineRegionResponse;
101 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetRegionInfoRequest;
102 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetRegionInfoResponse;
103 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetServerInfoRequest;
104 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetServerInfoResponse;
105 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetStoreFileRequest;
106 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetStoreFileResponse;
107 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.MergeRegionsRequest;
108 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.MergeRegionsResponse;
109 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.OpenRegionRequest;
110 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.OpenRegionRequest.RegionOpenInfo;
111 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.OpenRegionResponse;
112 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.OpenRegionResponse.RegionOpeningState;
113 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.ReplicateWALEntryRequest;
114 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.ReplicateWALEntryResponse;
115 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.RollWALWriterRequest;
116 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.RollWALWriterResponse;
117 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.SplitRegionRequest;
118 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.SplitRegionResponse;
119 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.StopServerRequest;
120 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.StopServerResponse;
121 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.UpdateConfigurationRequest;
122 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.UpdateConfigurationResponse;
123 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.UpdateFavoredNodesRequest;
124 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.UpdateFavoredNodesResponse;
125 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.WALEntry;
126 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.WarmupRegionRequest;
127 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.WarmupRegionResponse;
128 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
129 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Action;
130 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.BulkLoadHFileRequest;
131 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.BulkLoadHFileRequest.FamilyPath;
132 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.BulkLoadHFileResponse;
133 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ClientService;
134 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Condition;
135 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceRequest;
136 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceResponse;
137 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.GetRequest;
138 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.GetResponse;
139 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiRequest;
140 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiResponse;
141 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutateRequest;
142 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutateResponse;
143 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto;
144 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto.MutationType;
145 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionAction;
146 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionActionResult;
147 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ResultOrException;
148 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanRequest;
149 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanResponse;
150 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionInfo;
151 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier;
152 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType;
153 import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RequestHeader;
154 import org.apache.hadoop.hbase.protobuf.generated.WALProtos.BulkLoadDescriptor;
155 import org.apache.hadoop.hbase.protobuf.generated.WALProtos.CompactionDescriptor;
156 import org.apache.hadoop.hbase.protobuf.generated.WALProtos.FlushDescriptor;
157 import org.apache.hadoop.hbase.protobuf.generated.WALProtos.RegionEventDescriptor;
158 import org.apache.hadoop.hbase.quotas.OperationQuota;
159 import org.apache.hadoop.hbase.quotas.RegionServerQuotaManager;
160 import org.apache.hadoop.hbase.regionserver.Leases.LeaseStillHeldException;
161 import org.apache.hadoop.hbase.regionserver.Region.FlushResult;
162 import org.apache.hadoop.hbase.regionserver.Region.Operation;
163 import org.apache.hadoop.hbase.regionserver.ScannerContext.LimitScope;
164 import org.apache.hadoop.hbase.regionserver.handler.OpenMetaHandler;
165 import org.apache.hadoop.hbase.regionserver.handler.OpenRegionHandler;
166 import org.apache.hadoop.hbase.wal.WAL;
167 import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
168 import org.apache.hadoop.hbase.util.Bytes;
169 import org.apache.hadoop.hbase.util.Counter;
170 import org.apache.hadoop.hbase.util.DNS;
171 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
172 import org.apache.hadoop.hbase.util.Pair;
173 import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil;
174 import org.apache.hadoop.hbase.util.Strings;
175 import org.apache.hadoop.hbase.wal.WALKey;
176 import org.apache.hadoop.hbase.wal.WALSplitter;
177 import org.apache.hadoop.hbase.zookeeper.ZKSplitLog;
178 import org.apache.zookeeper.KeeperException;
179 
180 import com.google.common.annotations.VisibleForTesting;
181 import com.google.protobuf.ByteString;
182 import com.google.protobuf.Message;
183 import com.google.protobuf.RpcController;
184 import com.google.protobuf.ServiceException;
185 import com.google.protobuf.TextFormat;
186 
187 /**
188  * Implements the regionserver RPC services.
189  */
190 @InterfaceAudience.Private
191 @SuppressWarnings("deprecation")
192 public class RSRpcServices implements HBaseRPCErrorHandler,
193     AdminService.BlockingInterface, ClientService.BlockingInterface, PriorityFunction {
194   protected static final Log LOG = LogFactory.getLog(RSRpcServices.class);
195 
196   /** RPC scheduler to use for the region server. */
197   public static final String REGION_SERVER_RPC_SCHEDULER_FACTORY_CLASS =
198     "hbase.region.server.rpc.scheduler.factory.class";
199 
200   /**
201    * Minimum allowable time limit delta (in milliseconds) that can be enforced during scans. This
202    * configuration exists to prevent the scenario where a time limit is specified to be so
203    * restrictive that the time limit is reached immediately (before any cells are scanned).
204    */
205   private static final String REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA =
206       "hbase.region.server.rpc.minimum.scan.time.limit.delta";
207   /**
208    * Default value of {@link RSRpcServices#REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA}
209    */
210   private static final long DEFAULT_REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA = 10;
211 
212   // Request counter. (Includes requests that are not serviced by regions.)
213   final Counter requestCount = new Counter();
214   // Server to handle client requests.
215   final RpcServerInterface rpcServer;
216   final InetSocketAddress isa;
217 
218   private final HRegionServer regionServer;
219   private final long maxScannerResultSize;
220 
221   // The reference to the priority extraction function
222   private final PriorityFunction priority;
223 
224   private final AtomicLong scannerIdGen = new AtomicLong(0L);
225   private final ConcurrentHashMap<String, RegionScannerHolder> scanners =
226     new ConcurrentHashMap<String, RegionScannerHolder>();
227 
228   /**
229    * The lease timeout period for client scanners (milliseconds).
230    */
231   private final int scannerLeaseTimeoutPeriod;
232 
233   /**
234    * The RPC timeout period (milliseconds)
235    */
236   private final int rpcTimeout;
237 
238   /**
239    * The minimum allowable delta to use for the scan limit
240    */
241   private final long minimumScanTimeLimitDelta;
242 
243   /**
244    * Holder class which holds the RegionScanner and nextCallSeq together.
245    */
246   private static class RegionScannerHolder {
247     private AtomicLong nextCallSeq = new AtomicLong(0);
248     private RegionScanner s;
249     private Region r;
250 
251     public RegionScannerHolder(RegionScanner s, Region r) {
252       this.s = s;
253       this.r = r;
254     }
255 
256     private long getNextCallSeq() {
257       return nextCallSeq.get();
258     }
259 
260     private void incNextCallSeq() {
261       nextCallSeq.incrementAndGet();
262     }
263 
264     private void rollbackNextCallSeq() {
265       nextCallSeq.decrementAndGet();
266     }
267   }
268 
269   /**
270    * Instantiated as a scanner lease. If the lease times out, the scanner is
271    * closed
272    */
273   private class ScannerListener implements LeaseListener {
274     private final String scannerName;
275 
276     ScannerListener(final String n) {
277       this.scannerName = n;
278     }
279 
280     @Override
281     public void leaseExpired() {
282       RegionScannerHolder rsh = scanners.remove(this.scannerName);
283       if (rsh != null) {
284         RegionScanner s = rsh.s;
285         LOG.info("Scanner " + this.scannerName + " lease expired on region "
286           + s.getRegionInfo().getRegionNameAsString());
287         Region region = null;
288         try {
289           region = regionServer.getRegion(s.getRegionInfo().getRegionName());
290           if (region != null && region.getCoprocessorHost() != null) {
291             region.getCoprocessorHost().preScannerClose(s);
292           }
293         } catch (IOException e) {
294           LOG.error("Closing scanner for " + s.getRegionInfo().getRegionNameAsString(), e);
295         } finally {
296           try {
297             s.close();
298             if (region != null && region.getCoprocessorHost() != null) {
299               region.getCoprocessorHost().postScannerClose(s);
300             }
301           } catch (IOException e) {
302             LOG.error("Closing scanner for " + s.getRegionInfo().getRegionNameAsString(), e);
303           }
304         }
305       } else {
306         LOG.warn("Scanner " + this.scannerName + " lease expired, but no related" +
307           " scanner found, hence no chance to close that related scanner!");
308       }
309     }
310   }
311 
312   private static ResultOrException getResultOrException(
313       final ClientProtos.Result r, final int index, final ClientProtos.RegionLoadStats stats) {
314     return getResultOrException(ResponseConverter.buildActionResult(r, stats), index);
315   }
316 
317   private static ResultOrException getResultOrException(final Exception e, final int index) {
318     return getResultOrException(ResponseConverter.buildActionResult(e), index);
319   }
320 
321   private static ResultOrException getResultOrException(
322       final ResultOrException.Builder builder, final int index) {
323     return builder.setIndex(index).build();
324   }
325 
326   /**
327    * Starts the nonce operation for a mutation, if needed.
328    * @param mutation Mutation.
329    * @param nonceGroup Nonce group from the request.
330    * @returns Nonce used (can be NO_NONCE).
331    */
332   private long startNonceOperation(final MutationProto mutation, long nonceGroup)
333       throws IOException, OperationConflictException {
334     if (regionServer.nonceManager == null || !mutation.hasNonce()) return HConstants.NO_NONCE;
335     boolean canProceed = false;
336     try {
337       canProceed = regionServer.nonceManager.startOperation(
338         nonceGroup, mutation.getNonce(), regionServer);
339     } catch (InterruptedException ex) {
340       throw new InterruptedIOException("Nonce start operation interrupted");
341     }
342     if (!canProceed) {
343       // TODO: instead, we could convert append/increment to get w/mvcc
344       String message = "The operation with nonce {" + nonceGroup + ", " + mutation.getNonce()
345         + "} on row [" + Bytes.toString(mutation.getRow().toByteArray())
346         + "] may have already completed";
347       throw new OperationConflictException(message);
348     }
349     return mutation.getNonce();
350   }
351 
352   /**
353    * Ends nonce operation for a mutation, if needed.
354    * @param mutation Mutation.
355    * @param nonceGroup Nonce group from the request. Always 0 in initial implementation.
356    * @param success Whether the operation for this nonce has succeeded.
357    */
358   private void endNonceOperation(final MutationProto mutation,
359       long nonceGroup, boolean success) {
360     if (regionServer.nonceManager != null && mutation.hasNonce()) {
361       regionServer.nonceManager.endOperation(nonceGroup, mutation.getNonce(), success);
362     }
363   }
364 
365   /**
366    * @return True if current call supports cellblocks
367    */
368   private boolean isClientCellBlockSupport() {
369     RpcCallContext context = RpcServer.getCurrentCall();
370     return context != null && context.isClientCellBlockSupport();
371   }
372 
373   private void addResult(final MutateResponse.Builder builder,
374       final Result result, final PayloadCarryingRpcController rpcc) {
375     if (result == null) return;
376     if (isClientCellBlockSupport()) {
377       builder.setResult(ProtobufUtil.toResultNoData(result));
378       rpcc.setCellScanner(result.cellScanner());
379     } else {
380       ClientProtos.Result pbr = ProtobufUtil.toResult(result);
381       builder.setResult(pbr);
382     }
383   }
384 
385   private void addResults(final ScanResponse.Builder builder, final List<Result> results,
386       final RpcController controller, boolean isDefaultRegion) {
387     builder.setStale(!isDefaultRegion);
388     if (results == null || results.isEmpty()) return;
389     if (isClientCellBlockSupport()) {
390       for (Result res : results) {
391         builder.addCellsPerResult(res.size());
392         builder.addPartialFlagPerResult(res.isPartial());
393       }
394       ((PayloadCarryingRpcController)controller).
395         setCellScanner(CellUtil.createCellScanner(results));
396     } else {
397       for (Result res: results) {
398         ClientProtos.Result pbr = ProtobufUtil.toResult(res);
399         builder.addResults(pbr);
400       }
401     }
402   }
403 
404   /**
405    * Mutate a list of rows atomically.
406    *
407    * @param region
408    * @param actions
409    * @param cellScanner if non-null, the mutation data -- the Cell content.
410    * @throws IOException
411    */
412   private ClientProtos.RegionLoadStats mutateRows(final Region region,
413       final List<ClientProtos.Action> actions,
414       final CellScanner cellScanner) throws IOException {
415     if (!region.getRegionInfo().isMetaTable()) {
416       regionServer.cacheFlusher.reclaimMemStoreMemory();
417     }
418     RowMutations rm = null;
419     for (ClientProtos.Action action: actions) {
420       if (action.hasGet()) {
421         throw new DoNotRetryIOException("Atomic put and/or delete only, not a Get=" +
422           action.getGet());
423       }
424       MutationType type = action.getMutation().getMutateType();
425       if (rm == null) {
426         rm = new RowMutations(action.getMutation().getRow().toByteArray());
427       }
428       switch (type) {
429       case PUT:
430         rm.add(ProtobufUtil.toPut(action.getMutation(), cellScanner));
431         break;
432       case DELETE:
433         rm.add(ProtobufUtil.toDelete(action.getMutation(), cellScanner));
434         break;
435       default:
436           throw new DoNotRetryIOException("Atomic put and/or delete only, not " + type.name());
437       }
438     }
439     region.mutateRow(rm);
440     return ((HRegion)region).getRegionStats();
441   }
442 
443   /**
444    * Mutate a list of rows atomically.
445    *
446    * @param region
447    * @param actions
448    * @param cellScanner if non-null, the mutation data -- the Cell content.
449    * @param row
450    * @param family
451    * @param qualifier
452    * @param compareOp
453    * @param comparator @throws IOException
454    */
455   private boolean checkAndRowMutate(final Region region, final List<ClientProtos.Action> actions,
456       final CellScanner cellScanner, byte[] row, byte[] family, byte[] qualifier,
457       CompareOp compareOp, ByteArrayComparable comparator) throws IOException {
458     if (!region.getRegionInfo().isMetaTable()) {
459       regionServer.cacheFlusher.reclaimMemStoreMemory();
460     }
461     RowMutations rm = null;
462     for (ClientProtos.Action action: actions) {
463       if (action.hasGet()) {
464         throw new DoNotRetryIOException("Atomic put and/or delete only, not a Get=" +
465             action.getGet());
466       }
467       MutationType type = action.getMutation().getMutateType();
468       if (rm == null) {
469         rm = new RowMutations(action.getMutation().getRow().toByteArray());
470       }
471       switch (type) {
472       case PUT:
473         rm.add(ProtobufUtil.toPut(action.getMutation(), cellScanner));
474         break;
475       case DELETE:
476         rm.add(ProtobufUtil.toDelete(action.getMutation(), cellScanner));
477         break;
478       default:
479         throw new DoNotRetryIOException("Atomic put and/or delete only, not " + type.name());
480       }
481     }
482     return region.checkAndRowMutate(row, family, qualifier, compareOp, comparator, rm, Boolean.TRUE);
483   }
484 
485   /**
486    * Execute an append mutation.
487    *
488    * @param region
489    * @param m
490    * @param cellScanner
491    * @return result to return to client if default operation should be
492    * bypassed as indicated by RegionObserver, null otherwise
493    * @throws IOException
494    */
495   private Result append(final Region region, final OperationQuota quota, final MutationProto m,
496       final CellScanner cellScanner, long nonceGroup) throws IOException {
497     long before = EnvironmentEdgeManager.currentTime();
498     Append append = ProtobufUtil.toAppend(m, cellScanner);
499     quota.addMutation(append);
500     Result r = null;
501     if (region.getCoprocessorHost() != null) {
502       r = region.getCoprocessorHost().preAppend(append);
503     }
504     if (r == null) {
505       long nonce = startNonceOperation(m, nonceGroup);
506       boolean success = false;
507       try {
508         r = region.append(append, nonceGroup, nonce);
509         success = true;
510       } finally {
511         endNonceOperation(m, nonceGroup, success);
512       }
513       if (region.getCoprocessorHost() != null) {
514         region.getCoprocessorHost().postAppend(append, r);
515       }
516     }
517     if (regionServer.metricsRegionServer != null) {
518       regionServer.metricsRegionServer.updateAppend(
519         EnvironmentEdgeManager.currentTime() - before);
520     }
521     return r;
522   }
523 
524   /**
525    * Execute an increment mutation.
526    *
527    * @param region
528    * @param mutation
529    * @return the Result
530    * @throws IOException
531    */
532   private Result increment(final Region region, final OperationQuota quota,
533       final MutationProto mutation, final CellScanner cells, long nonceGroup) throws IOException {
534     long before = EnvironmentEdgeManager.currentTime();
535     Increment increment = ProtobufUtil.toIncrement(mutation, cells);
536     quota.addMutation(increment);
537     Result r = null;
538     if (region.getCoprocessorHost() != null) {
539       r = region.getCoprocessorHost().preIncrement(increment);
540     }
541     if (r == null) {
542       long nonce = startNonceOperation(mutation, nonceGroup);
543       boolean success = false;
544       try {
545         r = region.increment(increment, nonceGroup, nonce);
546         success = true;
547       } finally {
548         endNonceOperation(mutation, nonceGroup, success);
549       }
550       if (region.getCoprocessorHost() != null) {
551         r = region.getCoprocessorHost().postIncrement(increment, r);
552       }
553     }
554     if (regionServer.metricsRegionServer != null) {
555       regionServer.metricsRegionServer.updateIncrement(
556         EnvironmentEdgeManager.currentTime() - before);
557     }
558     return r;
559   }
560 
561   /**
562    * Run through the regionMutation <code>rm</code> and per Mutation, do the work, and then when
563    * done, add an instance of a {@link ResultOrException} that corresponds to each Mutation.
564    * @param region
565    * @param actions
566    * @param cellScanner
567    * @param builder
568    * @param cellsToReturn  Could be null. May be allocated in this method.  This is what this
569    * method returns as a 'result'.
570    * @return Return the <code>cellScanner</code> passed
571    */
572   private List<CellScannable> doNonAtomicRegionMutation(final Region region,
573       final OperationQuota quota, final RegionAction actions, final CellScanner cellScanner,
574       final RegionActionResult.Builder builder, List<CellScannable> cellsToReturn, long nonceGroup) {
575     // Gather up CONTIGUOUS Puts and Deletes in this mutations List.  Idea is that rather than do
576     // one at a time, we instead pass them in batch.  Be aware that the corresponding
577     // ResultOrException instance that matches each Put or Delete is then added down in the
578     // doBatchOp call.  We should be staying aligned though the Put and Delete are deferred/batched
579     List<ClientProtos.Action> mutations = null;
580     for (ClientProtos.Action action: actions.getActionList()) {
581       ClientProtos.ResultOrException.Builder resultOrExceptionBuilder = null;
582       try {
583         Result r = null;
584         if (action.hasGet()) {
585           long before = EnvironmentEdgeManager.currentTime();
586           try {
587             Get get = ProtobufUtil.toGet(action.getGet());
588             r = region.get(get);
589           } finally {
590             if (regionServer.metricsRegionServer != null) {
591               regionServer.metricsRegionServer.updateGet(
592                 EnvironmentEdgeManager.currentTime() - before);
593             }
594           }
595         } else if (action.hasServiceCall()) {
596           resultOrExceptionBuilder = ResultOrException.newBuilder();
597           try {
598             Message result = execServiceOnRegion(region, action.getServiceCall());
599             ClientProtos.CoprocessorServiceResult.Builder serviceResultBuilder =
600                 ClientProtos.CoprocessorServiceResult.newBuilder();
601             resultOrExceptionBuilder.setServiceResult(
602                 serviceResultBuilder.setValue(
603                   serviceResultBuilder.getValueBuilder()
604                     .setName(result.getClass().getName())
605                     .setValue(result.toByteString())));
606           } catch (IOException ioe) {
607             rpcServer.getMetrics().exception(ioe);
608             resultOrExceptionBuilder.setException(ResponseConverter.buildException(ioe));
609           }
610         } else if (action.hasMutation()) {
611           MutationType type = action.getMutation().getMutateType();
612           if (type != MutationType.PUT && type != MutationType.DELETE && mutations != null &&
613               !mutations.isEmpty()) {
614             // Flush out any Puts or Deletes already collected.
615             doBatchOp(builder, region, quota, mutations, cellScanner);
616             mutations.clear();
617           }
618           switch (type) {
619           case APPEND:
620             r = append(region, quota, action.getMutation(), cellScanner, nonceGroup);
621             break;
622           case INCREMENT:
623             r = increment(region, quota, action.getMutation(), cellScanner,  nonceGroup);
624             break;
625           case PUT:
626           case DELETE:
627             // Collect the individual mutations and apply in a batch
628             if (mutations == null) {
629               mutations = new ArrayList<ClientProtos.Action>(actions.getActionCount());
630             }
631             mutations.add(action);
632             break;
633           default:
634             throw new DoNotRetryIOException("Unsupported mutate type: " + type.name());
635           }
636         } else {
637           throw new HBaseIOException("Unexpected Action type");
638         }
639         if (r != null) {
640           ClientProtos.Result pbResult = null;
641           if (isClientCellBlockSupport()) {
642             pbResult = ProtobufUtil.toResultNoData(r);
643             //  Hard to guess the size here.  Just make a rough guess.
644             if (cellsToReturn == null) cellsToReturn = new ArrayList<CellScannable>();
645             cellsToReturn.add(r);
646           } else {
647             pbResult = ProtobufUtil.toResult(r);
648           }
649           resultOrExceptionBuilder =
650             ClientProtos.ResultOrException.newBuilder().setResult(pbResult);
651         }
652         // Could get to here and there was no result and no exception.  Presumes we added
653         // a Put or Delete to the collecting Mutations List for adding later.  In this
654         // case the corresponding ResultOrException instance for the Put or Delete will be added
655         // down in the doBatchOp method call rather than up here.
656       } catch (IOException ie) {
657         rpcServer.getMetrics().exception(ie);
658         resultOrExceptionBuilder = ResultOrException.newBuilder().
659           setException(ResponseConverter.buildException(ie));
660       }
661       if (resultOrExceptionBuilder != null) {
662         // Propagate index.
663         resultOrExceptionBuilder.setIndex(action.getIndex());
664         builder.addResultOrException(resultOrExceptionBuilder.build());
665       }
666     }
667     // Finish up any outstanding mutations
668     if (mutations != null && !mutations.isEmpty()) {
669       doBatchOp(builder, region, quota, mutations, cellScanner);
670     }
671     return cellsToReturn;
672   }
673 
674   /**
675    * Execute a list of Put/Delete mutations.
676    *
677    * @param builder
678    * @param region
679    * @param mutations
680    */
681   private void doBatchOp(final RegionActionResult.Builder builder, final Region region,
682       final OperationQuota quota,
683       final List<ClientProtos.Action> mutations, final CellScanner cells) {
684     Mutation[] mArray = new Mutation[mutations.size()];
685     long before = EnvironmentEdgeManager.currentTime();
686     boolean batchContainsPuts = false, batchContainsDelete = false;
687     try {
688       int i = 0;
689       for (ClientProtos.Action action: mutations) {
690         MutationProto m = action.getMutation();
691         Mutation mutation;
692         if (m.getMutateType() == MutationType.PUT) {
693           mutation = ProtobufUtil.toPut(m, cells);
694           batchContainsPuts = true;
695         } else {
696           mutation = ProtobufUtil.toDelete(m, cells);
697           batchContainsDelete = true;
698         }
699         mArray[i++] = mutation;
700         quota.addMutation(mutation);
701       }
702 
703       if (!region.getRegionInfo().isMetaTable()) {
704         regionServer.cacheFlusher.reclaimMemStoreMemory();
705       }
706 
707       OperationStatus codes[] = region.batchMutate(mArray, HConstants.NO_NONCE,
708         HConstants.NO_NONCE);
709       for (i = 0; i < codes.length; i++) {
710         int index = mutations.get(i).getIndex();
711         Exception e = null;
712         switch (codes[i].getOperationStatusCode()) {
713           case BAD_FAMILY:
714             e = new NoSuchColumnFamilyException(codes[i].getExceptionMsg());
715             builder.addResultOrException(getResultOrException(e, index));
716             break;
717 
718           case SANITY_CHECK_FAILURE:
719             e = new FailedSanityCheckException(codes[i].getExceptionMsg());
720             builder.addResultOrException(getResultOrException(e, index));
721             break;
722 
723           default:
724             e = new DoNotRetryIOException(codes[i].getExceptionMsg());
725             builder.addResultOrException(getResultOrException(e, index));
726             break;
727 
728           case SUCCESS:
729             builder.addResultOrException(getResultOrException(
730               ClientProtos.Result.getDefaultInstance(), index,
731                 ((HRegion)region).getRegionStats()));
732             break;
733         }
734       }
735     } catch (IOException ie) {
736       for (int i = 0; i < mutations.size(); i++) {
737         builder.addResultOrException(getResultOrException(ie, mutations.get(i).getIndex()));
738       }
739     }
740     if (regionServer.metricsRegionServer != null) {
741       long after = EnvironmentEdgeManager.currentTime();
742       if (batchContainsPuts) {
743         regionServer.metricsRegionServer.updatePut(after - before);
744       }
745       if (batchContainsDelete) {
746         regionServer.metricsRegionServer.updateDelete(after - before);
747       }
748     }
749   }
750 
751   /**
752    * Execute a list of Put/Delete mutations. The function returns OperationStatus instead of
753    * constructing MultiResponse to save a possible loop if caller doesn't need MultiResponse.
754    * @param region
755    * @param mutations
756    * @param replaySeqId
757    * @return an array of OperationStatus which internally contains the OperationStatusCode and the
758    *         exceptionMessage if any
759    * @throws IOException
760    */
761   private OperationStatus [] doReplayBatchOp(final Region region,
762       final List<WALSplitter.MutationReplay> mutations, long replaySeqId) throws IOException {
763     long before = EnvironmentEdgeManager.currentTime();
764     boolean batchContainsPuts = false, batchContainsDelete = false;
765     try {
766       for (Iterator<WALSplitter.MutationReplay> it = mutations.iterator(); it.hasNext();) {
767         WALSplitter.MutationReplay m = it.next();
768 
769         if (m.type == MutationType.PUT) {
770           batchContainsPuts = true;
771         } else {
772           batchContainsDelete = true;
773         }
774 
775         NavigableMap<byte[], List<Cell>> map = m.mutation.getFamilyCellMap();
776         List<Cell> metaCells = map.get(WALEdit.METAFAMILY);
777         if (metaCells != null && !metaCells.isEmpty()) {
778           for (Cell metaCell : metaCells) {
779             CompactionDescriptor compactionDesc = WALEdit.getCompaction(metaCell);
780             boolean isDefaultReplica = RegionReplicaUtil.isDefaultReplica(region.getRegionInfo());
781             HRegion hRegion = (HRegion)region;
782             if (compactionDesc != null) {
783               // replay the compaction. Remove the files from stores only if we are the primary
784               // region replica (thus own the files)
785               hRegion.replayWALCompactionMarker(compactionDesc, !isDefaultReplica, isDefaultReplica,
786                 replaySeqId);
787               continue;
788             }
789             FlushDescriptor flushDesc = WALEdit.getFlushDescriptor(metaCell);
790             if (flushDesc != null && !isDefaultReplica) {
791               hRegion.replayWALFlushMarker(flushDesc, replaySeqId);
792               continue;
793             }
794             RegionEventDescriptor regionEvent = WALEdit.getRegionEventDescriptor(metaCell);
795             if (regionEvent != null && !isDefaultReplica) {
796               hRegion.replayWALRegionEventMarker(regionEvent);
797               continue;
798             }
799             BulkLoadDescriptor bulkLoadEvent = WALEdit.getBulkLoadDescriptor(metaCell);
800             if (bulkLoadEvent != null) {
801               hRegion.replayWALBulkLoadEventMarker(bulkLoadEvent);
802               continue;
803             }
804           }
805           it.remove();
806         }
807       }
808       requestCount.add(mutations.size());
809       if (!region.getRegionInfo().isMetaTable()) {
810         regionServer.cacheFlusher.reclaimMemStoreMemory();
811       }
812       return region.batchReplay(mutations.toArray(
813         new WALSplitter.MutationReplay[mutations.size()]), replaySeqId);
814     } finally {
815       if (regionServer.metricsRegionServer != null) {
816         long after = EnvironmentEdgeManager.currentTime();
817           if (batchContainsPuts) {
818           regionServer.metricsRegionServer.updatePut(after - before);
819         }
820         if (batchContainsDelete) {
821           regionServer.metricsRegionServer.updateDelete(after - before);
822         }
823       }
824     }
825   }
826 
827   private void closeAllScanners() {
828     // Close any outstanding scanners. Means they'll get an UnknownScanner
829     // exception next time they come in.
830     for (Map.Entry<String, RegionScannerHolder> e : scanners.entrySet()) {
831       try {
832         e.getValue().s.close();
833       } catch (IOException ioe) {
834         LOG.warn("Closing scanner " + e.getKey(), ioe);
835       }
836     }
837   }
838 
839   public RSRpcServices(HRegionServer rs) throws IOException {
840     regionServer = rs;
841 
842     RpcSchedulerFactory rpcSchedulerFactory;
843     try {
844       Class<?> rpcSchedulerFactoryClass = rs.conf.getClass(
845           REGION_SERVER_RPC_SCHEDULER_FACTORY_CLASS,
846           SimpleRpcSchedulerFactory.class);
847       rpcSchedulerFactory = ((RpcSchedulerFactory) rpcSchedulerFactoryClass.newInstance());
848     } catch (InstantiationException e) {
849       throw new IllegalArgumentException(e);
850     } catch (IllegalAccessException e) {
851       throw new IllegalArgumentException(e);
852     }
853     // Server to handle client requests.
854     InetSocketAddress initialIsa;
855     InetSocketAddress bindAddress;
856     if(this instanceof MasterRpcServices) {
857       String hostname = getHostname(rs.conf, true);
858       int port = rs.conf.getInt(HConstants.MASTER_PORT, HConstants.DEFAULT_MASTER_PORT);
859       // Creation of a HSA will force a resolve.
860       initialIsa = new InetSocketAddress(hostname, port);
861       bindAddress = new InetSocketAddress(rs.conf.get("hbase.master.ipc.address", hostname), port);
862     } else {
863       String hostname = getHostname(rs.conf, false);
864       int port = rs.conf.getInt(HConstants.REGIONSERVER_PORT,
865         HConstants.DEFAULT_REGIONSERVER_PORT);
866       // Creation of a HSA will force a resolve.
867       initialIsa = new InetSocketAddress(hostname, port);
868       bindAddress = new InetSocketAddress(
869         rs.conf.get("hbase.regionserver.ipc.address", hostname), port);
870     }
871     if (initialIsa.getAddress() == null) {
872       throw new IllegalArgumentException("Failed resolve of " + initialIsa);
873     }
874     priority = new AnnotationReadingPriorityFunction(this);
875     String name = rs.getProcessName() + "/" + initialIsa.toString();
876     // Set how many times to retry talking to another server over HConnection.
877     ConnectionUtils.setServerSideHConnectionRetriesConfig(rs.conf, name, LOG);
878     rpcServer = new RpcServer(rs, name, getServices(),
879       bindAddress, // use final bindAddress for this server.
880       rs.conf,
881       rpcSchedulerFactory.create(rs.conf, this, rs));
882 
883     scannerLeaseTimeoutPeriod = rs.conf.getInt(
884       HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD,
885       HConstants.DEFAULT_HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD);
886     maxScannerResultSize = rs.conf.getLong(
887       HConstants.HBASE_SERVER_SCANNER_MAX_RESULT_SIZE_KEY,
888       HConstants.DEFAULT_HBASE_SERVER_SCANNER_MAX_RESULT_SIZE);
889     rpcTimeout = rs.conf.getInt(
890       HConstants.HBASE_RPC_TIMEOUT_KEY,
891       HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
892     minimumScanTimeLimitDelta = rs.conf.getLong(
893       REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA,
894       DEFAULT_REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA);
895 
896     InetSocketAddress address = rpcServer.getListenerAddress();
897     if (address == null) {
898       throw new IOException("Listener channel is closed");
899     }
900     // Set our address, however we need the final port that was given to rpcServer
901     isa = new InetSocketAddress(initialIsa.getHostName(), address.getPort());
902     rpcServer.setErrorHandler(this);
903     rs.setName(name);
904   }
905 
906   public static String getHostname(Configuration conf, boolean isMaster)
907       throws UnknownHostException {
908     String hostname = conf.get(isMaster? HRegionServer.MASTER_HOSTNAME_KEY :
909       HRegionServer.RS_HOSTNAME_KEY);
910     if (hostname == null || hostname.isEmpty()) {
911       String masterOrRS = isMaster ? "master" : "regionserver";
912       return Strings.domainNamePointerToHostName(DNS.getDefaultHost(
913         conf.get("hbase." + masterOrRS + ".dns.interface", "default"),
914         conf.get("hbase." + masterOrRS + ".dns.nameserver", "default")));
915     } else {
916       LOG.info("hostname is configured to be " + hostname);
917       return hostname;
918     }
919   }
920 
921   RegionScanner getScanner(long scannerId) {
922     String scannerIdString = Long.toString(scannerId);
923     RegionScannerHolder scannerHolder = scanners.get(scannerIdString);
924     if (scannerHolder != null) {
925       return scannerHolder.s;
926     }
927     return null;
928   }
929 
930   /**
931    * Get the vtime associated with the scanner.
932    * Currently the vtime is the number of "next" calls.
933    */
934   long getScannerVirtualTime(long scannerId) {
935     String scannerIdString = Long.toString(scannerId);
936     RegionScannerHolder scannerHolder = scanners.get(scannerIdString);
937     if (scannerHolder != null) {
938       return scannerHolder.getNextCallSeq();
939     }
940     return 0L;
941   }
942 
943   long addScanner(RegionScanner s, Region r) throws LeaseStillHeldException {
944     long scannerId = this.scannerIdGen.incrementAndGet();
945     String scannerName = String.valueOf(scannerId);
946 
947     RegionScannerHolder existing =
948       scanners.putIfAbsent(scannerName, new RegionScannerHolder(s, r));
949     assert existing == null : "scannerId must be unique within regionserver's whole lifecycle!";
950 
951     regionServer.leases.createLease(scannerName, this.scannerLeaseTimeoutPeriod,
952         new ScannerListener(scannerName));
953     return scannerId;
954   }
955 
956   /**
957    * Find the HRegion based on a region specifier
958    *
959    * @param regionSpecifier the region specifier
960    * @return the corresponding region
961    * @throws IOException if the specifier is not null,
962    *    but failed to find the region
963    */
964   Region getRegion(
965       final RegionSpecifier regionSpecifier) throws IOException {
966     return regionServer.getRegionByEncodedName(regionSpecifier.getValue().toByteArray(),
967         ProtobufUtil.getRegionEncodedName(regionSpecifier));
968   }
969 
970   @VisibleForTesting
971   public PriorityFunction getPriority() {
972     return priority;
973   }
974 
975   Configuration getConfiguration() {
976     return regionServer.getConfiguration();
977   }
978 
979   private RegionServerQuotaManager getQuotaManager() {
980     return regionServer.getRegionServerQuotaManager();
981   }
982 
983   void start() {
984     rpcServer.start();
985   }
986 
987   void stop() {
988     closeAllScanners();
989     rpcServer.stop();
990   }
991 
992   /**
993    * Called to verify that this server is up and running.
994    *
995    * @throws IOException
996    */
997   protected void checkOpen() throws IOException {
998     if (regionServer.isAborted()) {
999       throw new RegionServerAbortedException("Server " + regionServer.serverName + " aborting");
1000     }
1001     if (regionServer.isStopped()) {
1002       throw new RegionServerStoppedException("Server " + regionServer.serverName + " stopping");
1003     }
1004     if (!regionServer.fsOk) {
1005       throw new RegionServerStoppedException("File system not available");
1006     }
1007     if (!regionServer.isOnline()) {
1008       throw new ServerNotRunningYetException("Server is not running yet");
1009     }
1010   }
1011 
1012   /**
1013    * @return list of blocking services and their security info classes that this server supports
1014    */
1015   protected List<BlockingServiceAndInterface> getServices() {
1016     List<BlockingServiceAndInterface> bssi = new ArrayList<BlockingServiceAndInterface>(2);
1017     bssi.add(new BlockingServiceAndInterface(
1018       ClientService.newReflectiveBlockingService(this),
1019       ClientService.BlockingInterface.class));
1020     bssi.add(new BlockingServiceAndInterface(
1021       AdminService.newReflectiveBlockingService(this),
1022       AdminService.BlockingInterface.class));
1023     return bssi;
1024   }
1025 
1026   public InetSocketAddress getSocketAddress() {
1027     return isa;
1028   }
1029 
1030   @Override
1031   public int getPriority(RequestHeader header, Message param) {
1032     return priority.getPriority(header, param);
1033   }
1034 
1035   @Override
1036   public long getDeadline(RequestHeader header, Message param) {
1037     return priority.getDeadline(header, param);
1038   }
1039 
1040   /*
1041    * Check if an OOME and, if so, abort immediately to avoid creating more objects.
1042    *
1043    * @param e
1044    *
1045    * @return True if we OOME'd and are aborting.
1046    */
1047   @Override
1048   public boolean checkOOME(final Throwable e) {
1049     boolean stop = false;
1050     try {
1051       if (e instanceof OutOfMemoryError
1052           || (e.getCause() != null && e.getCause() instanceof OutOfMemoryError)
1053           || (e.getMessage() != null && e.getMessage().contains(
1054               "java.lang.OutOfMemoryError"))) {
1055         stop = true;
1056         LOG.fatal("Run out of memory; " + getClass().getSimpleName()
1057           + " will abort itself immediately", e);
1058       }
1059     } finally {
1060       if (stop) {
1061         Runtime.getRuntime().halt(1);
1062       }
1063     }
1064     return stop;
1065   }
1066 
1067   /**
1068    * Close a region on the region server.
1069    *
1070    * @param controller the RPC controller
1071    * @param request the request
1072    * @throws ServiceException
1073    */
1074   @Override
1075   @QosPriority(priority=HConstants.ADMIN_QOS)
1076   public CloseRegionResponse closeRegion(final RpcController controller,
1077       final CloseRegionRequest request) throws ServiceException {
1078     final ServerName sn = (request.hasDestinationServer() ?
1079       ProtobufUtil.toServerName(request.getDestinationServer()) : null);
1080 
1081     try {
1082       checkOpen();
1083       if (request.hasServerStartCode()) {
1084         // check that we are the same server that this RPC is intended for.
1085         long serverStartCode = request.getServerStartCode();
1086         if (regionServer.serverName.getStartcode() !=  serverStartCode) {
1087           throw new ServiceException(new DoNotRetryIOException("This RPC was intended for a " +
1088               "different server with startCode: " + serverStartCode + ", this server is: "
1089               + regionServer.serverName));
1090         }
1091       }
1092       final String encodedRegionName = ProtobufUtil.getRegionEncodedName(request.getRegion());
1093 
1094       requestCount.increment();
1095       LOG.info("Close " + encodedRegionName + ", moving to " + sn);
1096       CloseRegionCoordination.CloseRegionDetails crd = regionServer.getCoordinatedStateManager()
1097         .getCloseRegionCoordination().parseFromProtoRequest(request);
1098 
1099       boolean closed = regionServer.closeRegion(encodedRegionName, false, crd, sn);
1100       CloseRegionResponse.Builder builder = CloseRegionResponse.newBuilder().setClosed(closed);
1101       return builder.build();
1102     } catch (IOException ie) {
1103       throw new ServiceException(ie);
1104     }
1105   }
1106 
1107   /**
1108    * Compact a region on the region server.
1109    *
1110    * @param controller the RPC controller
1111    * @param request the request
1112    * @throws ServiceException
1113    */
1114   @Override
1115   @QosPriority(priority=HConstants.ADMIN_QOS)
1116   public CompactRegionResponse compactRegion(final RpcController controller,
1117       final CompactRegionRequest request) throws ServiceException {
1118     try {
1119       checkOpen();
1120       requestCount.increment();
1121       Region region = getRegion(request.getRegion());
1122       region.startRegionOperation(Operation.COMPACT_REGION);
1123       LOG.info("Compacting " + region.getRegionInfo().getRegionNameAsString());
1124       boolean major = false;
1125       byte [] family = null;
1126       Store store = null;
1127       if (request.hasFamily()) {
1128         family = request.getFamily().toByteArray();
1129         store = region.getStore(family);
1130         if (store == null) {
1131           throw new ServiceException(new IOException("column family " + Bytes.toString(family)
1132             + " does not exist in region " + region.getRegionInfo().getRegionNameAsString()));
1133         }
1134       }
1135       if (request.hasMajor()) {
1136         major = request.getMajor();
1137       }
1138       if (major) {
1139         if (family != null) {
1140           store.triggerMajorCompaction();
1141         } else {
1142           region.triggerMajorCompaction();
1143         }
1144       }
1145 
1146       String familyLogMsg = (family != null)?" for column family: " + Bytes.toString(family):"";
1147       if (LOG.isTraceEnabled()) {
1148         LOG.trace("User-triggered compaction requested for region "
1149           + region.getRegionInfo().getRegionNameAsString() + familyLogMsg);
1150       }
1151       String log = "User-triggered " + (major ? "major " : "") + "compaction" + familyLogMsg;
1152       if(family != null) {
1153         regionServer.compactSplitThread.requestCompaction(region, store, log,
1154           Store.PRIORITY_USER, null, RpcServer.getRequestUser());
1155       } else {
1156         regionServer.compactSplitThread.requestCompaction(region, log,
1157           Store.PRIORITY_USER, null, RpcServer.getRequestUser());
1158       }
1159       return CompactRegionResponse.newBuilder().build();
1160     } catch (IOException ie) {
1161       throw new ServiceException(ie);
1162     }
1163   }
1164 
1165   /**
1166    * Flush a region on the region server.
1167    *
1168    * @param controller the RPC controller
1169    * @param request the request
1170    * @throws ServiceException
1171    */
1172   @Override
1173   @QosPriority(priority=HConstants.ADMIN_QOS)
1174   public FlushRegionResponse flushRegion(final RpcController controller,
1175       final FlushRegionRequest request) throws ServiceException {
1176     try {
1177       checkOpen();
1178       requestCount.increment();
1179       Region region = getRegion(request.getRegion());
1180       LOG.info("Flushing " + region.getRegionInfo().getRegionNameAsString());
1181       boolean shouldFlush = true;
1182       if (request.hasIfOlderThanTs()) {
1183         shouldFlush = region.getEarliestFlushTimeForAllStores() < request.getIfOlderThanTs();
1184       }
1185       FlushRegionResponse.Builder builder = FlushRegionResponse.newBuilder();
1186       if (shouldFlush) {
1187         boolean writeFlushWalMarker =  request.hasWriteFlushWalMarker() ?
1188             request.getWriteFlushWalMarker() : false;
1189         long startTime = EnvironmentEdgeManager.currentTime();
1190         // Go behind the curtain so we can manage writing of the flush WAL marker
1191         HRegion.FlushResultImpl flushResult = (HRegion.FlushResultImpl)
1192             ((HRegion)region).flushcache(true, writeFlushWalMarker);
1193         if (flushResult.isFlushSucceeded()) {
1194           long endTime = EnvironmentEdgeManager.currentTime();
1195           regionServer.metricsRegionServer.updateFlushTime(endTime - startTime);
1196         }
1197         boolean compactionNeeded = flushResult.isCompactionNeeded();
1198         if (compactionNeeded) {
1199           regionServer.compactSplitThread.requestSystemCompaction(region,
1200             "Compaction through user triggered flush");
1201         }
1202         builder.setFlushed(flushResult.isFlushSucceeded());
1203         builder.setWroteFlushWalMarker(flushResult.wroteFlushWalMarker);
1204       }
1205       builder.setLastFlushTime(region.getEarliestFlushTimeForAllStores());
1206       return builder.build();
1207     } catch (DroppedSnapshotException ex) {
1208       // Cache flush can fail in a few places. If it fails in a critical
1209       // section, we get a DroppedSnapshotException and a replay of wal
1210       // is required. Currently the only way to do this is a restart of
1211       // the server.
1212       regionServer.abort("Replay of WAL required. Forcing server shutdown", ex);
1213       throw new ServiceException(ex);
1214     } catch (IOException ie) {
1215       throw new ServiceException(ie);
1216     }
1217   }
1218 
1219   @Override
1220   @QosPriority(priority=HConstants.ADMIN_QOS)
1221   public GetOnlineRegionResponse getOnlineRegion(final RpcController controller,
1222       final GetOnlineRegionRequest request) throws ServiceException {
1223     try {
1224       checkOpen();
1225       requestCount.increment();
1226       Map<String, Region> onlineRegions = regionServer.onlineRegions;
1227       List<HRegionInfo> list = new ArrayList<HRegionInfo>(onlineRegions.size());
1228       for (Region region: onlineRegions.values()) {
1229         list.add(region.getRegionInfo());
1230       }
1231       Collections.sort(list);
1232       return ResponseConverter.buildGetOnlineRegionResponse(list);
1233     } catch (IOException ie) {
1234       throw new ServiceException(ie);
1235     }
1236   }
1237 
1238   @Override
1239   @QosPriority(priority=HConstants.ADMIN_QOS)
1240   public GetRegionInfoResponse getRegionInfo(final RpcController controller,
1241       final GetRegionInfoRequest request) throws ServiceException {
1242     try {
1243       checkOpen();
1244       requestCount.increment();
1245       Region region = getRegion(request.getRegion());
1246       HRegionInfo info = region.getRegionInfo();
1247       GetRegionInfoResponse.Builder builder = GetRegionInfoResponse.newBuilder();
1248       builder.setRegionInfo(HRegionInfo.convert(info));
1249       if (request.hasCompactionState() && request.getCompactionState()) {
1250         builder.setCompactionState(region.getCompactionState());
1251       }
1252       builder.setIsRecovering(region.isRecovering());
1253       return builder.build();
1254     } catch (IOException ie) {
1255       throw new ServiceException(ie);
1256     }
1257   }
1258 
1259   /**
1260    * Get some information of the region server.
1261    *
1262    * @param controller the RPC controller
1263    * @param request the request
1264    * @throws ServiceException
1265    */
1266   @Override
1267   @QosPriority(priority=HConstants.ADMIN_QOS)
1268   public GetServerInfoResponse getServerInfo(final RpcController controller,
1269       final GetServerInfoRequest request) throws ServiceException {
1270     try {
1271       checkOpen();
1272     } catch (IOException ie) {
1273       throw new ServiceException(ie);
1274     }
1275     requestCount.increment();
1276     int infoPort = regionServer.infoServer != null ? regionServer.infoServer.getPort() : -1;
1277     return ResponseConverter.buildGetServerInfoResponse(regionServer.serverName, infoPort);
1278   }
1279 
1280   @Override
1281   @QosPriority(priority=HConstants.ADMIN_QOS)
1282   public GetStoreFileResponse getStoreFile(final RpcController controller,
1283       final GetStoreFileRequest request) throws ServiceException {
1284     try {
1285       checkOpen();
1286       Region region = getRegion(request.getRegion());
1287       requestCount.increment();
1288       Set<byte[]> columnFamilies;
1289       if (request.getFamilyCount() == 0) {
1290         columnFamilies = region.getTableDesc().getFamiliesKeys();
1291       } else {
1292         columnFamilies = new TreeSet<byte[]>(Bytes.BYTES_RAWCOMPARATOR);
1293         for (ByteString cf: request.getFamilyList()) {
1294           columnFamilies.add(cf.toByteArray());
1295         }
1296       }
1297       int nCF = columnFamilies.size();
1298       List<String>  fileList = region.getStoreFileList(
1299         columnFamilies.toArray(new byte[nCF][]));
1300       GetStoreFileResponse.Builder builder = GetStoreFileResponse.newBuilder();
1301       builder.addAllStoreFile(fileList);
1302       return builder.build();
1303     } catch (IOException ie) {
1304       throw new ServiceException(ie);
1305     }
1306   }
1307 
1308   /**
1309    * Merge regions on the region server.
1310    *
1311    * @param controller the RPC controller
1312    * @param request the request
1313    * @return merge regions response
1314    * @throws ServiceException
1315    */
1316   @Override
1317   @QosPriority(priority = HConstants.ADMIN_QOS)
1318   public MergeRegionsResponse mergeRegions(final RpcController controller,
1319       final MergeRegionsRequest request) throws ServiceException {
1320     try {
1321       checkOpen();
1322       requestCount.increment();
1323       Region regionA = getRegion(request.getRegionA());
1324       Region regionB = getRegion(request.getRegionB());
1325       boolean forcible = request.getForcible();
1326       long masterSystemTime = request.hasMasterSystemTime() ? request.getMasterSystemTime() : -1;
1327       regionA.startRegionOperation(Operation.MERGE_REGION);
1328       regionB.startRegionOperation(Operation.MERGE_REGION);
1329       if (regionA.getRegionInfo().getReplicaId() != HRegionInfo.DEFAULT_REPLICA_ID ||
1330           regionB.getRegionInfo().getReplicaId() != HRegionInfo.DEFAULT_REPLICA_ID) {
1331         throw new ServiceException(new MergeRegionException("Can't merge non-default replicas"));
1332       }
1333       LOG.info("Receiving merging request for  " + regionA + ", " + regionB
1334           + ",forcible=" + forcible);
1335       long startTime = EnvironmentEdgeManager.currentTime();
1336       FlushResult flushResult = regionA.flush(true);
1337       if (flushResult.isFlushSucceeded()) {
1338         long endTime = EnvironmentEdgeManager.currentTime();
1339         regionServer.metricsRegionServer.updateFlushTime(endTime - startTime);
1340       }
1341       startTime = EnvironmentEdgeManager.currentTime();
1342       flushResult = regionB.flush(true);
1343       if (flushResult.isFlushSucceeded()) {
1344         long endTime = EnvironmentEdgeManager.currentTime();
1345         regionServer.metricsRegionServer.updateFlushTime(endTime - startTime);
1346       }
1347       regionServer.compactSplitThread.requestRegionsMerge(regionA, regionB, forcible,
1348           masterSystemTime, RpcServer.getRequestUser());
1349       return MergeRegionsResponse.newBuilder().build();
1350     } catch (DroppedSnapshotException ex) {
1351       regionServer.abort("Replay of WAL required. Forcing server shutdown", ex);
1352       throw new ServiceException(ex);
1353     } catch (IOException ie) {
1354       throw new ServiceException(ie);
1355     }
1356   }
1357 
1358   /**
1359    * Open asynchronously a region or a set of regions on the region server.
1360    *
1361    * The opening is coordinated by ZooKeeper, and this method requires the znode to be created
1362    *  before being called. As a consequence, this method should be called only from the master.
1363    * <p>
1364    * Different manages states for the region are:<ul>
1365    *  <li>region not opened: the region opening will start asynchronously.</li>
1366    *  <li>a close is already in progress: this is considered as an error.</li>
1367    *  <li>an open is already in progress: this new open request will be ignored. This is important
1368    *  because the Master can do multiple requests if it crashes.</li>
1369    *  <li>the region is already opened:  this new open request will be ignored./li>
1370    *  </ul>
1371    * </p>
1372    * <p>
1373    * Bulk assign: If there are more than 1 region to open, it will be considered as a bulk assign.
1374    * For a single region opening, errors are sent through a ServiceException. For bulk assign,
1375    * errors are put in the response as FAILED_OPENING.
1376    * </p>
1377    * @param controller the RPC controller
1378    * @param request the request
1379    * @throws ServiceException
1380    */
1381   @Override
1382   @QosPriority(priority=HConstants.ADMIN_QOS)
1383   public OpenRegionResponse openRegion(final RpcController controller,
1384       final OpenRegionRequest request) throws ServiceException {
1385     requestCount.increment();
1386     if (request.hasServerStartCode()) {
1387       // check that we are the same server that this RPC is intended for.
1388       long serverStartCode = request.getServerStartCode();
1389       if (regionServer.serverName.getStartcode() !=  serverStartCode) {
1390         throw new ServiceException(new DoNotRetryIOException("This RPC was intended for a " +
1391             "different server with startCode: " + serverStartCode + ", this server is: "
1392             + regionServer.serverName));
1393       }
1394     }
1395 
1396     OpenRegionResponse.Builder builder = OpenRegionResponse.newBuilder();
1397     final int regionCount = request.getOpenInfoCount();
1398     final Map<TableName, HTableDescriptor> htds =
1399         new HashMap<TableName, HTableDescriptor>(regionCount);
1400     final boolean isBulkAssign = regionCount > 1;
1401     try {
1402       checkOpen();
1403     } catch (IOException ie) {
1404       TableName tableName = null;
1405       if (regionCount == 1) {
1406         RegionInfo ri = request.getOpenInfo(0).getRegion();
1407         if (ri != null) {
1408           tableName = ProtobufUtil.toTableName(ri.getTableName());
1409         }
1410       }
1411       if (!TableName.META_TABLE_NAME.equals(tableName)) {
1412         throw new ServiceException(ie);
1413       }
1414       // We are assigning meta, wait a little for regionserver to finish initialization.
1415       int timeout = regionServer.conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY,
1416         HConstants.DEFAULT_HBASE_RPC_TIMEOUT) >> 2; // Quarter of RPC timeout
1417       long endTime = System.currentTimeMillis() + timeout;
1418       synchronized (regionServer.online) {
1419         try {
1420           while (System.currentTimeMillis() <= endTime
1421               && !regionServer.isStopped() && !regionServer.isOnline()) {
1422             regionServer.online.wait(regionServer.msgInterval);
1423           }
1424           checkOpen();
1425         } catch (InterruptedException t) {
1426           Thread.currentThread().interrupt();
1427           throw new ServiceException(t);
1428         } catch (IOException e) {
1429           throw new ServiceException(e);
1430         }
1431       }
1432     }
1433 
1434     long masterSystemTime = request.hasMasterSystemTime() ? request.getMasterSystemTime() : -1;
1435 
1436     for (RegionOpenInfo regionOpenInfo : request.getOpenInfoList()) {
1437       final HRegionInfo region = HRegionInfo.convert(regionOpenInfo.getRegion());
1438       OpenRegionCoordination coordination = regionServer.getCoordinatedStateManager().
1439         getOpenRegionCoordination();
1440       OpenRegionCoordination.OpenRegionDetails ord =
1441         coordination.parseFromProtoRequest(regionOpenInfo);
1442 
1443       HTableDescriptor htd;
1444       try {
1445         final Region onlineRegion = regionServer.getFromOnlineRegions(region.getEncodedName());
1446         if (onlineRegion != null) {
1447           //Check if the region can actually be opened.
1448           if (onlineRegion.getCoprocessorHost() != null) {
1449             onlineRegion.getCoprocessorHost().preOpen();
1450           }
1451           // See HBASE-5094. Cross check with hbase:meta if still this RS is owning
1452           // the region.
1453           Pair<HRegionInfo, ServerName> p = MetaTableAccessor.getRegion(
1454             regionServer.getConnection(), region.getRegionName());
1455           if (regionServer.serverName.equals(p.getSecond())) {
1456             Boolean closing = regionServer.regionsInTransitionInRS.get(region.getEncodedNameAsBytes());
1457             // Map regionsInTransitionInRSOnly has an entry for a region only if the region
1458             // is in transition on this RS, so here closing can be null. If not null, it can
1459             // be true or false. True means the region is opening on this RS; while false
1460             // means the region is closing. Only return ALREADY_OPENED if not closing (i.e.
1461             // not in transition any more, or still transition to open.
1462             if (!Boolean.FALSE.equals(closing)
1463                 && regionServer.getFromOnlineRegions(region.getEncodedName()) != null) {
1464               LOG.warn("Attempted open of " + region.getEncodedName()
1465                 + " but already online on this server");
1466               builder.addOpeningState(RegionOpeningState.ALREADY_OPENED);
1467               continue;
1468             }
1469           } else {
1470             LOG.warn("The region " + region.getEncodedName() + " is online on this server"
1471               + " but hbase:meta does not have this server - continue opening.");
1472             regionServer.removeFromOnlineRegions(onlineRegion, null);
1473           }
1474         }
1475         LOG.info("Open " + region.getRegionNameAsString());
1476         htd = htds.get(region.getTable());
1477         if (htd == null) {
1478           htd = regionServer.tableDescriptors.get(region.getTable());
1479           htds.put(region.getTable(), htd);
1480         }
1481 
1482         final Boolean previous = regionServer.regionsInTransitionInRS.putIfAbsent(
1483           region.getEncodedNameAsBytes(), Boolean.TRUE);
1484 
1485         if (Boolean.FALSE.equals(previous)) {
1486           // There is a close in progress. We need to mark this open as failed in ZK.
1487 
1488           coordination.tryTransitionFromOfflineToFailedOpen(regionServer, region, ord);
1489 
1490           throw new RegionAlreadyInTransitionException("Received OPEN for the region:"
1491             + region.getRegionNameAsString() + " , which we are already trying to CLOSE ");
1492         }
1493 
1494         if (Boolean.TRUE.equals(previous)) {
1495           // An open is in progress. This is supported, but let's log this.
1496           LOG.info("Receiving OPEN for the region:" +
1497             region.getRegionNameAsString() + " , which we are already trying to OPEN"
1498               + " - ignoring this new request for this region.");
1499         }
1500 
1501         // We are opening this region. If it moves back and forth for whatever reason, we don't
1502         // want to keep returning the stale moved record while we are opening/if we close again.
1503         regionServer.removeFromMovedRegions(region.getEncodedName());
1504 
1505         if (previous == null) {
1506           // check if the region to be opened is marked in recovering state in ZK
1507           if (ZKSplitLog.isRegionMarkedRecoveringInZK(regionServer.getZooKeeper(),
1508               region.getEncodedName())) {
1509             // Check if current region open is for distributedLogReplay. This check is to support
1510             // rolling restart/upgrade where we want to Master/RS see same configuration
1511             if (!regionOpenInfo.hasOpenForDistributedLogReplay()
1512                   || regionOpenInfo.getOpenForDistributedLogReplay()) {
1513               regionServer.recoveringRegions.put(region.getEncodedName(), null);
1514             } else {
1515               // Remove stale recovery region from ZK when we open region not for recovering which
1516               // could happen when turn distributedLogReplay off from on.
1517               List<String> tmpRegions = new ArrayList<String>();
1518               tmpRegions.add(region.getEncodedName());
1519               ZKSplitLog.deleteRecoveringRegionZNodes(regionServer.getZooKeeper(),
1520                 tmpRegions);
1521             }
1522           }
1523           // If there is no action in progress, we can submit a specific handler.
1524           // Need to pass the expected version in the constructor.
1525           if (region.isMetaRegion()) {
1526             regionServer.service.submit(new OpenMetaHandler(
1527               regionServer, regionServer, region, htd, masterSystemTime, coordination, ord));
1528           } else {
1529             regionServer.updateRegionFavoredNodesMapping(region.getEncodedName(),
1530               regionOpenInfo.getFavoredNodesList());
1531             regionServer.service.submit(new OpenRegionHandler(
1532               regionServer, regionServer, region, htd, masterSystemTime, coordination, ord));
1533           }
1534         }
1535 
1536         builder.addOpeningState(RegionOpeningState.OPENED);
1537 
1538       } catch (KeeperException zooKeeperEx) {
1539         LOG.error("Can't retrieve recovering state from zookeeper", zooKeeperEx);
1540         throw new ServiceException(zooKeeperEx);
1541       } catch (IOException ie) {
1542         LOG.warn("Failed opening region " + region.getRegionNameAsString(), ie);
1543         if (isBulkAssign) {
1544           builder.addOpeningState(RegionOpeningState.FAILED_OPENING);
1545         } else {
1546           throw new ServiceException(ie);
1547         }
1548       }
1549     }
1550     return builder.build();
1551   }
1552 
1553   /**
1554    *  Wamrmup a region on this server.
1555    *
1556    * This method should only be called by Master. It synchrnously opens the region and
1557    * closes the region bringing the most important pages in cache.
1558    * <p>
1559    *
1560    * @param controller the RPC controller
1561    * @param request the request
1562    * @throws ServiceException
1563    */
1564   @Override
1565   public WarmupRegionResponse warmupRegion(final RpcController controller,
1566       final WarmupRegionRequest request) throws ServiceException {
1567 
1568     RegionInfo regionInfo = request.getRegionInfo();
1569     final HRegionInfo region = HRegionInfo.convert(regionInfo);
1570     HTableDescriptor htd;
1571     WarmupRegionResponse response = WarmupRegionResponse.getDefaultInstance();
1572 
1573     try {
1574       checkOpen();
1575       String encodedName = region.getEncodedName();
1576       byte[] encodedNameBytes = region.getEncodedNameAsBytes();
1577       final Region onlineRegion = regionServer.getFromOnlineRegions(encodedName);
1578 
1579       if (onlineRegion != null) {
1580         LOG.info("Region already online. Skipping warming up " + region);
1581         return response;
1582       }
1583 
1584       if (LOG.isDebugEnabled()) {
1585         LOG.debug("Warming up Region " + region.getRegionNameAsString());
1586       }
1587 
1588       htd = regionServer.tableDescriptors.get(region.getTable());
1589 
1590       if (regionServer.getRegionsInTransitionInRS().containsKey(encodedNameBytes)) {
1591         LOG.info("Region is in transition. Skipping warmup " + region);
1592         return response;
1593       }
1594 
1595       HRegion.warmupHRegion(region, htd, regionServer.getWAL(region),
1596           regionServer.getConfiguration(), regionServer, null);
1597 
1598     } catch (IOException ie) {
1599       LOG.error("Failed warming up region " + region.getRegionNameAsString(), ie);
1600       throw new ServiceException(ie);
1601     }
1602 
1603     return response;
1604   }
1605 
1606   /**
1607    * Replay the given changes when distributedLogReplay WAL edits from a failed RS. The guarantee is
1608    * that the given mutations will be durable on the receiving RS if this method returns without any
1609    * exception.
1610    * @param controller the RPC controller
1611    * @param request the request
1612    * @throws ServiceException
1613    */
1614   @Override
1615   @QosPriority(priority = HConstants.REPLAY_QOS)
1616   public ReplicateWALEntryResponse replay(final RpcController controller,
1617       final ReplicateWALEntryRequest request) throws ServiceException {
1618     long before = EnvironmentEdgeManager.currentTime();
1619     CellScanner cells = ((PayloadCarryingRpcController) controller).cellScanner();
1620     try {
1621       checkOpen();
1622       List<WALEntry> entries = request.getEntryList();
1623       if (entries == null || entries.isEmpty()) {
1624         // empty input
1625         return ReplicateWALEntryResponse.newBuilder().build();
1626       }
1627       ByteString regionName = entries.get(0).getKey().getEncodedRegionName();
1628       Region region = regionServer.getRegionByEncodedName(regionName.toStringUtf8());
1629       RegionCoprocessorHost coprocessorHost =
1630           ServerRegionReplicaUtil.isDefaultReplica(region.getRegionInfo())
1631             ? region.getCoprocessorHost()
1632             : null; // do not invoke coprocessors if this is a secondary region replica
1633       List<Pair<WALKey, WALEdit>> walEntries = new ArrayList<Pair<WALKey, WALEdit>>();
1634 
1635       // Skip adding the edits to WAL if this is a secondary region replica
1636       boolean isPrimary = RegionReplicaUtil.isDefaultReplica(region.getRegionInfo());
1637       Durability durability = isPrimary ? Durability.USE_DEFAULT : Durability.SKIP_WAL;
1638 
1639       for (WALEntry entry : entries) {
1640         if (!regionName.equals(entry.getKey().getEncodedRegionName())) {
1641           throw new NotServingRegionException("Replay request contains entries from multiple " +
1642               "regions. First region:" + regionName.toStringUtf8() + " , other region:"
1643               + entry.getKey().getEncodedRegionName());
1644         }
1645         if (regionServer.nonceManager != null && isPrimary) {
1646           long nonceGroup = entry.getKey().hasNonceGroup()
1647             ? entry.getKey().getNonceGroup() : HConstants.NO_NONCE;
1648           long nonce = entry.getKey().hasNonce() ? entry.getKey().getNonce() : HConstants.NO_NONCE;
1649           regionServer.nonceManager.reportOperationFromWal(nonceGroup, nonce, entry.getKey().getWriteTime());
1650         }
1651         Pair<WALKey, WALEdit> walEntry = (coprocessorHost == null) ? null :
1652           new Pair<WALKey, WALEdit>();
1653         List<WALSplitter.MutationReplay> edits = WALSplitter.getMutationsFromWALEntry(entry,
1654           cells, walEntry, durability);
1655         if (coprocessorHost != null) {
1656           // Start coprocessor replay here. The coprocessor is for each WALEdit instead of a
1657           // KeyValue.
1658           if (coprocessorHost.preWALRestore(region.getRegionInfo(), walEntry.getFirst(),
1659             walEntry.getSecond())) {
1660             // if bypass this log entry, ignore it ...
1661             continue;
1662           }
1663           walEntries.add(walEntry);
1664         }
1665         if(edits!=null && !edits.isEmpty()) {
1666           long replaySeqId = (entry.getKey().hasOrigSequenceNumber()) ?
1667             entry.getKey().getOrigSequenceNumber() : entry.getKey().getLogSequenceNumber();
1668           OperationStatus[] result = doReplayBatchOp(region, edits, replaySeqId);
1669           // check if it's a partial success
1670           for (int i = 0; result != null && i < result.length; i++) {
1671             if (result[i] != OperationStatus.SUCCESS) {
1672               throw new IOException(result[i].getExceptionMsg());
1673             }
1674           }
1675         }
1676       }
1677 
1678       //sync wal at the end because ASYNC_WAL is used above
1679       WAL wal = getWAL(region);
1680       if (wal != null) {
1681         wal.sync();
1682       }
1683 
1684       if (coprocessorHost != null) {
1685         for (Pair<WALKey, WALEdit> entry : walEntries) {
1686           coprocessorHost.postWALRestore(region.getRegionInfo(), entry.getFirst(),
1687             entry.getSecond());
1688         }
1689       }
1690       return ReplicateWALEntryResponse.newBuilder().build();
1691     } catch (IOException ie) {
1692       throw new ServiceException(ie);
1693     } finally {
1694       if (regionServer.metricsRegionServer != null) {
1695         regionServer.metricsRegionServer.updateReplay(
1696           EnvironmentEdgeManager.currentTime() - before);
1697       }
1698     }
1699   }
1700 
1701   WAL getWAL(Region region) {
1702     return ((HRegion)region).getWAL();
1703   }
1704 
1705   /**
1706    * Replicate WAL entries on the region server.
1707    *
1708    * @param controller the RPC controller
1709    * @param request the request
1710    * @throws ServiceException
1711    */
1712   @Override
1713   @QosPriority(priority=HConstants.REPLICATION_QOS)
1714   public ReplicateWALEntryResponse replicateWALEntry(final RpcController controller,
1715       final ReplicateWALEntryRequest request) throws ServiceException {
1716     try {
1717       checkOpen();
1718       if (regionServer.replicationSinkHandler != null) {
1719         requestCount.increment();
1720         List<WALEntry> entries = request.getEntryList();
1721         CellScanner cellScanner = ((PayloadCarryingRpcController)controller).cellScanner();
1722         regionServer.getRegionServerCoprocessorHost().preReplicateLogEntries(entries, cellScanner);
1723         regionServer.replicationSinkHandler.replicateLogEntries(entries, cellScanner);
1724         regionServer.getRegionServerCoprocessorHost().postReplicateLogEntries(entries, cellScanner);
1725         return ReplicateWALEntryResponse.newBuilder().build();
1726       } else {
1727         throw new ServiceException("Replication services are not initialized yet");
1728       }
1729     } catch (IOException ie) {
1730       throw new ServiceException(ie);
1731     }
1732   }
1733 
1734   /**
1735    * Roll the WAL writer of the region server.
1736    * @param controller the RPC controller
1737    * @param request the request
1738    * @throws ServiceException
1739    */
1740   @Override
1741   public RollWALWriterResponse rollWALWriter(final RpcController controller,
1742       final RollWALWriterRequest request) throws ServiceException {
1743     try {
1744       checkOpen();
1745       requestCount.increment();
1746       regionServer.getRegionServerCoprocessorHost().preRollWALWriterRequest();
1747       regionServer.walRoller.requestRollAll();
1748       regionServer.getRegionServerCoprocessorHost().postRollWALWriterRequest();
1749       RollWALWriterResponse.Builder builder = RollWALWriterResponse.newBuilder();
1750       return builder.build();
1751     } catch (IOException ie) {
1752       throw new ServiceException(ie);
1753     }
1754   }
1755 
1756   /**
1757    * Split a region on the region server.
1758    *
1759    * @param controller the RPC controller
1760    * @param request the request
1761    * @throws ServiceException
1762    */
1763   @Override
1764   @QosPriority(priority=HConstants.ADMIN_QOS)
1765   public SplitRegionResponse splitRegion(final RpcController controller,
1766       final SplitRegionRequest request) throws ServiceException {
1767     try {
1768       checkOpen();
1769       requestCount.increment();
1770       Region region = getRegion(request.getRegion());
1771       region.startRegionOperation(Operation.SPLIT_REGION);
1772       if (region.getRegionInfo().getReplicaId() != HRegionInfo.DEFAULT_REPLICA_ID) {
1773         throw new IOException("Can't split replicas directly. "
1774             + "Replicas are auto-split when their primary is split.");
1775       }
1776       LOG.info("Splitting " + region.getRegionInfo().getRegionNameAsString());
1777       long startTime = EnvironmentEdgeManager.currentTime();
1778       FlushResult flushResult = region.flush(true);
1779       if (flushResult.isFlushSucceeded()) {
1780         long endTime = EnvironmentEdgeManager.currentTime();
1781         regionServer.metricsRegionServer.updateFlushTime(endTime - startTime);
1782       }
1783       byte[] splitPoint = null;
1784       if (request.hasSplitPoint()) {
1785         splitPoint = request.getSplitPoint().toByteArray();
1786       }
1787       ((HRegion)region).forceSplit(splitPoint);
1788       regionServer.compactSplitThread.requestSplit(region, ((HRegion)region).checkSplit(),
1789         RpcServer.getRequestUser());
1790       return SplitRegionResponse.newBuilder().build();
1791     } catch (DroppedSnapshotException ex) {
1792       regionServer.abort("Replay of WAL required. Forcing server shutdown", ex);
1793       throw new ServiceException(ex);
1794     } catch (IOException ie) {
1795       throw new ServiceException(ie);
1796     }
1797   }
1798 
1799   /**
1800    * Stop the region server.
1801    *
1802    * @param controller the RPC controller
1803    * @param request the request
1804    * @throws ServiceException
1805    */
1806   @Override
1807   @QosPriority(priority=HConstants.ADMIN_QOS)
1808   public StopServerResponse stopServer(final RpcController controller,
1809       final StopServerRequest request) throws ServiceException {
1810     requestCount.increment();
1811     String reason = request.getReason();
1812     regionServer.stop(reason);
1813     return StopServerResponse.newBuilder().build();
1814   }
1815 
1816   @Override
1817   public UpdateFavoredNodesResponse updateFavoredNodes(RpcController controller,
1818       UpdateFavoredNodesRequest request) throws ServiceException {
1819     List<UpdateFavoredNodesRequest.RegionUpdateInfo> openInfoList = request.getUpdateInfoList();
1820     UpdateFavoredNodesResponse.Builder respBuilder = UpdateFavoredNodesResponse.newBuilder();
1821     for (UpdateFavoredNodesRequest.RegionUpdateInfo regionUpdateInfo : openInfoList) {
1822       HRegionInfo hri = HRegionInfo.convert(regionUpdateInfo.getRegion());
1823       regionServer.updateRegionFavoredNodesMapping(hri.getEncodedName(),
1824         regionUpdateInfo.getFavoredNodesList());
1825     }
1826     respBuilder.setResponse(openInfoList.size());
1827     return respBuilder.build();
1828   }
1829 
1830   /**
1831    * Atomically bulk load several HFiles into an open region
1832    * @return true if successful, false is failed but recoverably (no action)
1833    * @throws IOException if failed unrecoverably
1834    */
1835   @Override
1836   public BulkLoadHFileResponse bulkLoadHFile(final RpcController controller,
1837       final BulkLoadHFileRequest request) throws ServiceException {
1838     try {
1839       checkOpen();
1840       requestCount.increment();
1841       Region region = getRegion(request.getRegion());
1842       List<Pair<byte[], String>> familyPaths = new ArrayList<Pair<byte[], String>>();
1843       for (FamilyPath familyPath: request.getFamilyPathList()) {
1844         familyPaths.add(new Pair<byte[], String>(familyPath.getFamily().toByteArray(),
1845           familyPath.getPath()));
1846       }
1847       boolean bypass = false;
1848       if (region.getCoprocessorHost() != null) {
1849         bypass = region.getCoprocessorHost().preBulkLoadHFile(familyPaths);
1850       }
1851       boolean loaded = false;
1852       if (!bypass) {
1853         loaded = region.bulkLoadHFiles(familyPaths, request.getAssignSeqNum(), null);
1854       }
1855       if (region.getCoprocessorHost() != null) {
1856         loaded = region.getCoprocessorHost().postBulkLoadHFile(familyPaths, loaded);
1857       }
1858       BulkLoadHFileResponse.Builder builder = BulkLoadHFileResponse.newBuilder();
1859       builder.setLoaded(loaded);
1860       return builder.build();
1861     } catch (IOException ie) {
1862       throw new ServiceException(ie);
1863     }
1864   }
1865 
1866   @Override
1867   public CoprocessorServiceResponse execService(final RpcController controller,
1868       final CoprocessorServiceRequest request) throws ServiceException {
1869     try {
1870       checkOpen();
1871       requestCount.increment();
1872       Region region = getRegion(request.getRegion());
1873       Message result = execServiceOnRegion(region, request.getCall());
1874       CoprocessorServiceResponse.Builder builder =
1875         CoprocessorServiceResponse.newBuilder();
1876       builder.setRegion(RequestConverter.buildRegionSpecifier(
1877         RegionSpecifierType.REGION_NAME, region.getRegionInfo().getRegionName()));
1878       builder.setValue(
1879         builder.getValueBuilder().setName(result.getClass().getName())
1880           .setValue(result.toByteString()));
1881       return builder.build();
1882     } catch (IOException ie) {
1883       throw new ServiceException(ie);
1884     }
1885   }
1886 
1887   private Message execServiceOnRegion(Region region,
1888       final ClientProtos.CoprocessorServiceCall serviceCall) throws IOException {
1889     // ignore the passed in controller (from the serialized call)
1890     ServerRpcController execController = new ServerRpcController();
1891     Message result = region.execService(execController, serviceCall);
1892     if (execController.getFailedOn() != null) {
1893       throw execController.getFailedOn();
1894     }
1895     return result;
1896   }
1897 
1898   /**
1899    * Get data from a table.
1900    *
1901    * @param controller the RPC controller
1902    * @param request the get request
1903    * @throws ServiceException
1904    */
1905   @Override
1906   public GetResponse get(final RpcController controller,
1907       final GetRequest request) throws ServiceException {
1908     long before = EnvironmentEdgeManager.currentTime();
1909     OperationQuota quota = null;
1910     try {
1911       checkOpen();
1912       requestCount.increment();
1913       Region region = getRegion(request.getRegion());
1914 
1915       GetResponse.Builder builder = GetResponse.newBuilder();
1916       ClientProtos.Get get = request.getGet();
1917       Boolean existence = null;
1918       Result r = null;
1919       quota = getQuotaManager().checkQuota(region, OperationQuota.OperationType.GET);
1920 
1921       if (get.hasClosestRowBefore() && get.getClosestRowBefore()) {
1922         if (get.getColumnCount() != 1) {
1923           throw new DoNotRetryIOException(
1924             "get ClosestRowBefore supports one and only one family now, not "
1925               + get.getColumnCount() + " families");
1926         }
1927         byte[] row = get.getRow().toByteArray();
1928         byte[] family = get.getColumn(0).getFamily().toByteArray();
1929         r = region.getClosestRowBefore(row, family);
1930       } else {
1931         Get clientGet = ProtobufUtil.toGet(get);
1932         if (get.getExistenceOnly() && region.getCoprocessorHost() != null) {
1933           existence = region.getCoprocessorHost().preExists(clientGet);
1934         }
1935         if (existence == null) {
1936           r = region.get(clientGet);
1937           if (get.getExistenceOnly()) {
1938             boolean exists = r.getExists();
1939             if (region.getCoprocessorHost() != null) {
1940               exists = region.getCoprocessorHost().postExists(clientGet, exists);
1941             }
1942             existence = exists;
1943           }
1944         }
1945       }
1946       if (existence != null){
1947         ClientProtos.Result pbr =
1948             ProtobufUtil.toResult(existence, region.getRegionInfo().getReplicaId() != 0);
1949         builder.setResult(pbr);
1950       } else  if (r != null) {
1951         ClientProtos.Result pbr = ProtobufUtil.toResult(r);
1952         builder.setResult(pbr);
1953       }
1954       if (r != null) {
1955         quota.addGetResult(r);
1956       }
1957       return builder.build();
1958     } catch (IOException ie) {
1959       throw new ServiceException(ie);
1960     } finally {
1961       if (regionServer.metricsRegionServer != null) {
1962         regionServer.metricsRegionServer.updateGet(
1963           EnvironmentEdgeManager.currentTime() - before);
1964       }
1965       if (quota != null) {
1966         quota.close();
1967       }
1968     }
1969   }
1970 
1971   /**
1972    * Execute multiple actions on a table: get, mutate, and/or execCoprocessor
1973    *
1974    * @param rpcc the RPC controller
1975    * @param request the multi request
1976    * @throws ServiceException
1977    */
1978   @Override
1979   public MultiResponse multi(final RpcController rpcc, final MultiRequest request)
1980   throws ServiceException {
1981     try {
1982       checkOpen();
1983     } catch (IOException ie) {
1984       throw new ServiceException(ie);
1985     }
1986 
1987     // rpc controller is how we bring in data via the back door;  it is unprotobuf'ed data.
1988     // It is also the conduit via which we pass back data.
1989     PayloadCarryingRpcController controller = (PayloadCarryingRpcController)rpcc;
1990     CellScanner cellScanner = controller != null ? controller.cellScanner(): null;
1991     if (controller != null) controller.setCellScanner(null);
1992 
1993     long nonceGroup = request.hasNonceGroup() ? request.getNonceGroup() : HConstants.NO_NONCE;
1994 
1995     // this will contain all the cells that we need to return. It's created later, if needed.
1996     List<CellScannable> cellsToReturn = null;
1997     MultiResponse.Builder responseBuilder = MultiResponse.newBuilder();
1998     RegionActionResult.Builder regionActionResultBuilder = RegionActionResult.newBuilder();
1999     Boolean processed = null;
2000 
2001     for (RegionAction regionAction : request.getRegionActionList()) {
2002       this.requestCount.add(regionAction.getActionCount());
2003       OperationQuota quota;
2004       Region region;
2005       regionActionResultBuilder.clear();
2006       try {
2007         region = getRegion(regionAction.getRegion());
2008         quota = getQuotaManager().checkQuota(region, regionAction.getActionList());
2009       } catch (IOException e) {
2010         rpcServer.getMetrics().exception(e);
2011         regionActionResultBuilder.setException(ResponseConverter.buildException(e));
2012         responseBuilder.addRegionActionResult(regionActionResultBuilder.build());
2013         if (cellScanner != null) {
2014           skipCellsForMutations(regionAction.getActionList(), cellScanner);
2015         }
2016         continue;  // For this region it's a failure.
2017       }
2018 
2019       if (regionAction.hasAtomic() && regionAction.getAtomic()) {
2020         // How does this call happen?  It may need some work to play well w/ the surroundings.
2021         // Need to return an item per Action along w/ Action index.  TODO.
2022         try {
2023           if (request.hasCondition()) {
2024             Condition condition = request.getCondition();
2025             byte[] row = condition.getRow().toByteArray();
2026             byte[] family = condition.getFamily().toByteArray();
2027             byte[] qualifier = condition.getQualifier().toByteArray();
2028             CompareOp compareOp = CompareOp.valueOf(condition.getCompareType().name());
2029             ByteArrayComparable comparator =
2030                 ProtobufUtil.toComparator(condition.getComparator());
2031             processed = checkAndRowMutate(region, regionAction.getActionList(),
2032                   cellScanner, row, family, qualifier, compareOp, comparator);
2033           } else {
2034             ClientProtos.RegionLoadStats stats = mutateRows(region, regionAction.getActionList(),
2035                 cellScanner);
2036             // add the stats to the request
2037             if(stats != null) {
2038               responseBuilder.addRegionActionResult(RegionActionResult.newBuilder()
2039                   .addResultOrException(ResultOrException.newBuilder().setLoadStats(stats)));
2040             }
2041             processed = Boolean.TRUE;
2042           }
2043         } catch (IOException e) {
2044           rpcServer.getMetrics().exception(e);
2045           // As it's atomic, we may expect it's a global failure.
2046           regionActionResultBuilder.setException(ResponseConverter.buildException(e));
2047         }
2048       } else {
2049         // doNonAtomicRegionMutation manages the exception internally
2050         cellsToReturn = doNonAtomicRegionMutation(region, quota, regionAction, cellScanner,
2051             regionActionResultBuilder, cellsToReturn, nonceGroup);
2052       }
2053       responseBuilder.addRegionActionResult(regionActionResultBuilder.build());
2054       quota.close();
2055     }
2056     // Load the controller with the Cells to return.
2057     if (cellsToReturn != null && !cellsToReturn.isEmpty() && controller != null) {
2058       controller.setCellScanner(CellUtil.createCellScanner(cellsToReturn));
2059     }
2060     if (processed != null) responseBuilder.setProcessed(processed);
2061     return responseBuilder.build();
2062   }
2063 
2064   private void skipCellsForMutations(List<Action> actions, CellScanner cellScanner) {
2065     for (Action action : actions) {
2066       skipCellsForMutation(action, cellScanner);
2067     }
2068   }
2069 
2070   private void skipCellsForMutation(Action action, CellScanner cellScanner) {
2071     try {
2072       if (action.hasMutation()) {
2073         MutationProto m = action.getMutation();
2074         if (m.hasAssociatedCellCount()) {
2075           for (int i = 0; i < m.getAssociatedCellCount(); i++) {
2076             cellScanner.advance();
2077           }
2078         }
2079       }
2080     } catch (IOException e) {
2081       // No need to handle these Individual Muatation level issue. Any way this entire RegionAction
2082       // marked as failed as we could not see the Region here. At client side the top level
2083       // RegionAction exception will be considered first.
2084       LOG.error("Error while skipping Cells in CellScanner for invalid Region Mutations", e);
2085     }
2086   }
2087 
2088   /**
2089    * Mutate data in a table.
2090    *
2091    * @param rpcc the RPC controller
2092    * @param request the mutate request
2093    * @throws ServiceException
2094    */
2095   @Override
2096   public MutateResponse mutate(final RpcController rpcc,
2097       final MutateRequest request) throws ServiceException {
2098     // rpc controller is how we bring in data via the back door;  it is unprotobuf'ed data.
2099     // It is also the conduit via which we pass back data.
2100     PayloadCarryingRpcController controller = (PayloadCarryingRpcController)rpcc;
2101     CellScanner cellScanner = controller != null? controller.cellScanner(): null;
2102     OperationQuota quota = null;
2103     // Clear scanner so we are not holding on to reference across call.
2104     if (controller != null) controller.setCellScanner(null);
2105     try {
2106       checkOpen();
2107       requestCount.increment();
2108       Region region = getRegion(request.getRegion());
2109       MutateResponse.Builder builder = MutateResponse.newBuilder();
2110       MutationProto mutation = request.getMutation();
2111       if (!region.getRegionInfo().isMetaTable()) {
2112         regionServer.cacheFlusher.reclaimMemStoreMemory();
2113       }
2114       long nonceGroup = request.hasNonceGroup() ? request.getNonceGroup() : HConstants.NO_NONCE;
2115       Result r = null;
2116       Boolean processed = null;
2117       MutationType type = mutation.getMutateType();
2118       long mutationSize = 0;
2119       quota = getQuotaManager().checkQuota(region, OperationQuota.OperationType.MUTATE);
2120       switch (type) {
2121       case APPEND:
2122         // TODO: this doesn't actually check anything.
2123         r = append(region, quota, mutation, cellScanner, nonceGroup);
2124         break;
2125       case INCREMENT:
2126         // TODO: this doesn't actually check anything.
2127         r = increment(region, quota, mutation, cellScanner, nonceGroup);
2128         break;
2129       case PUT:
2130         Put put = ProtobufUtil.toPut(mutation, cellScanner);
2131         quota.addMutation(put);
2132         if (request.hasCondition()) {
2133           Condition condition = request.getCondition();
2134           byte[] row = condition.getRow().toByteArray();
2135           byte[] family = condition.getFamily().toByteArray();
2136           byte[] qualifier = condition.getQualifier().toByteArray();
2137           CompareOp compareOp = CompareOp.valueOf(condition.getCompareType().name());
2138           ByteArrayComparable comparator =
2139             ProtobufUtil.toComparator(condition.getComparator());
2140           if (region.getCoprocessorHost() != null) {
2141             processed = region.getCoprocessorHost().preCheckAndPut(
2142               row, family, qualifier, compareOp, comparator, put);
2143           }
2144           if (processed == null) {
2145             boolean result = region.checkAndMutate(row, family,
2146               qualifier, compareOp, comparator, put, true);
2147             if (region.getCoprocessorHost() != null) {
2148               result = region.getCoprocessorHost().postCheckAndPut(row, family,
2149                 qualifier, compareOp, comparator, put, result);
2150             }
2151             processed = result;
2152           }
2153         } else {
2154           region.put(put);
2155           processed = Boolean.TRUE;
2156         }
2157         break;
2158       case DELETE:
2159         Delete delete = ProtobufUtil.toDelete(mutation, cellScanner);
2160         quota.addMutation(delete);
2161         if (request.hasCondition()) {
2162           Condition condition = request.getCondition();
2163           byte[] row = condition.getRow().toByteArray();
2164           byte[] family = condition.getFamily().toByteArray();
2165           byte[] qualifier = condition.getQualifier().toByteArray();
2166           CompareOp compareOp = CompareOp.valueOf(condition.getCompareType().name());
2167           ByteArrayComparable comparator =
2168             ProtobufUtil.toComparator(condition.getComparator());
2169           if (region.getCoprocessorHost() != null) {
2170             processed = region.getCoprocessorHost().preCheckAndDelete(
2171               row, family, qualifier, compareOp, comparator, delete);
2172           }
2173           if (processed == null) {
2174             boolean result = region.checkAndMutate(row, family,
2175               qualifier, compareOp, comparator, delete, true);
2176             if (region.getCoprocessorHost() != null) {
2177               result = region.getCoprocessorHost().postCheckAndDelete(row, family,
2178                 qualifier, compareOp, comparator, delete, result);
2179             }
2180             processed = result;
2181           }
2182         } else {
2183           region.delete(delete);
2184           processed = Boolean.TRUE;
2185         }
2186         break;
2187       default:
2188           throw new DoNotRetryIOException(
2189             "Unsupported mutate type: " + type.name());
2190       }
2191       if (processed != null) builder.setProcessed(processed.booleanValue());
2192       addResult(builder, r, controller);
2193       return builder.build();
2194     } catch (IOException ie) {
2195       regionServer.checkFileSystem();
2196       throw new ServiceException(ie);
2197     } finally {
2198       if (quota != null) {
2199         quota.close();
2200       }
2201     }
2202   }
2203 
2204   /**
2205    * Scan data in a table.
2206    *
2207    * @param controller the RPC controller
2208    * @param request the scan request
2209    * @throws ServiceException
2210    */
2211   @Override
2212   public ScanResponse scan(final RpcController controller, final ScanRequest request)
2213   throws ServiceException {
2214     OperationQuota quota = null;
2215     Leases.Lease lease = null;
2216     String scannerName = null;
2217     try {
2218       if (!request.hasScannerId() && !request.hasScan()) {
2219         throw new DoNotRetryIOException(
2220           "Missing required input: scannerId or scan");
2221       }
2222       long scannerId = -1;
2223       if (request.hasScannerId()) {
2224         scannerId = request.getScannerId();
2225         scannerName = String.valueOf(scannerId);
2226       }
2227       try {
2228         checkOpen();
2229       } catch (IOException e) {
2230         // If checkOpen failed, server not running or filesystem gone,
2231         // cancel this lease; filesystem is gone or we're closing or something.
2232         if (scannerName != null) {
2233           LOG.debug("Server shutting down and client tried to access missing scanner "
2234             + scannerName);
2235           if (regionServer.leases != null) {
2236             try {
2237               regionServer.leases.cancelLease(scannerName);
2238             } catch (LeaseException le) {
2239               // No problem, ignore
2240             }
2241           }
2242         }
2243         throw e;
2244       }
2245       requestCount.increment();
2246 
2247       int ttl = 0;
2248       Region region = null;
2249       RegionScanner scanner = null;
2250       RegionScannerHolder rsh = null;
2251       boolean moreResults = true;
2252       boolean closeScanner = false;
2253       boolean isSmallScan = false;
2254       ScanResponse.Builder builder = ScanResponse.newBuilder();
2255       if (request.hasCloseScanner()) {
2256         closeScanner = request.getCloseScanner();
2257       }
2258       int rows = closeScanner ? 0 : 1;
2259       if (request.hasNumberOfRows()) {
2260         rows = request.getNumberOfRows();
2261       }
2262       if (request.hasScannerId()) {
2263         rsh = scanners.get(scannerName);
2264         if (rsh == null) {
2265           LOG.warn("Client tried to access missing scanner " + scannerName);
2266           throw new UnknownScannerException(
2267             "Unknown scanner '" + scannerName + "'. This can happen due to any of the following "
2268                 + "reasons: a) Scanner id given is wrong, b) Scanner lease expired because of "
2269                 + "long wait between consecutive client checkins, c) Server may be closing down, "
2270                 + "d) RegionServer restart during upgrade.\nIf the issue is due to reason (b), a "
2271                 + "possible fix would be increasing the value of"
2272                 + "'hbase.client.scanner.timeout.period' configuration.");
2273         }
2274         scanner = rsh.s;
2275         HRegionInfo hri = scanner.getRegionInfo();
2276         region = regionServer.getRegion(hri.getRegionName());
2277         if (region != rsh.r) { // Yes, should be the same instance
2278           throw new NotServingRegionException("Region was re-opened after the scanner"
2279             + scannerName + " was created: " + hri.getRegionNameAsString());
2280         }
2281       } else {
2282         region = getRegion(request.getRegion());
2283         ClientProtos.Scan protoScan = request.getScan();
2284         boolean isLoadingCfsOnDemandSet = protoScan.hasLoadColumnFamiliesOnDemand();
2285         Scan scan = ProtobufUtil.toScan(protoScan);
2286         // if the request doesn't set this, get the default region setting.
2287         if (!isLoadingCfsOnDemandSet) {
2288           scan.setLoadColumnFamiliesOnDemand(region.isLoadingCfsOnDemandDefault());
2289         }
2290 
2291         isSmallScan = scan.isSmall();
2292         if (!scan.hasFamilies()) {
2293           // Adding all families to scanner
2294           for (byte[] family: region.getTableDesc().getFamiliesKeys()) {
2295             scan.addFamily(family);
2296           }
2297         }
2298 
2299         if (region.getCoprocessorHost() != null) {
2300           scanner = region.getCoprocessorHost().preScannerOpen(scan);
2301         }
2302         if (scanner == null) {
2303           scanner = region.getScanner(scan);
2304         }
2305         if (region.getCoprocessorHost() != null) {
2306           scanner = region.getCoprocessorHost().postScannerOpen(scan, scanner);
2307         }
2308         scannerId = addScanner(scanner, region);
2309         scannerName = String.valueOf(scannerId);
2310         ttl = this.scannerLeaseTimeoutPeriod;
2311       }
2312       if (request.hasRenew() && request.getRenew()) {
2313         rsh = scanners.get(scannerName);
2314         lease = regionServer.leases.removeLease(scannerName);
2315         if (lease != null && rsh != null) {
2316           regionServer.leases.addLease(lease);
2317           // Increment the nextCallSeq value which is the next expected from client.
2318           rsh.incNextCallSeq();
2319         }
2320         return builder.build();
2321       }
2322 
2323       quota = getQuotaManager().checkQuota(region, OperationQuota.OperationType.SCAN);
2324       long maxQuotaResultSize = Math.min(maxScannerResultSize, quota.getReadAvailable());
2325       if (rows > 0) {
2326         // if nextCallSeq does not match throw Exception straight away. This needs to be
2327         // performed even before checking of Lease.
2328         // See HBASE-5974
2329         if (request.hasNextCallSeq()) {
2330           if (rsh == null) {
2331             rsh = scanners.get(scannerName);
2332           }
2333           if (rsh != null) {
2334             if (request.getNextCallSeq() != rsh.getNextCallSeq()) {
2335               throw new OutOfOrderScannerNextException(
2336                 "Expected nextCallSeq: " + rsh.getNextCallSeq()
2337                 + " But the nextCallSeq got from client: " + request.getNextCallSeq() +
2338                 "; request=" + TextFormat.shortDebugString(request));
2339             }
2340             // Increment the nextCallSeq value which is the next expected from client.
2341             rsh.incNextCallSeq();
2342           }
2343         }
2344         try {
2345           // Remove lease while its being processed in server; protects against case
2346           // where processing of request takes > lease expiration time.
2347           lease = regionServer.leases.removeLease(scannerName);
2348           List<Result> results = new ArrayList<Result>();
2349           long totalCellSize = 0;
2350           long currentScanResultSize = 0;
2351 
2352           boolean done = false;
2353           // Call coprocessor. Get region info from scanner.
2354           if (region != null && region.getCoprocessorHost() != null) {
2355             Boolean bypass = region.getCoprocessorHost().preScannerNext(
2356               scanner, results, rows);
2357             if (!results.isEmpty()) {
2358               for (Result r : results) {
2359                 for (Cell cell : r.rawCells()) {
2360                   totalCellSize += CellUtil.estimatedSerializedSizeOf(cell);
2361                   currentScanResultSize += CellUtil.estimatedHeapSizeOfWithoutTags(cell);
2362                 }
2363               }
2364             }
2365             if (bypass != null && bypass.booleanValue()) {
2366               done = true;
2367             }
2368           }
2369 
2370           if (!done) {
2371             long maxResultSize = Math.min(scanner.getMaxResultSize(), maxQuotaResultSize);
2372             if (maxResultSize <= 0) {
2373               maxResultSize = maxQuotaResultSize;
2374             }
2375             // This is cells inside a row. Default size is 10 so if many versions or many cfs,
2376             // then we'll resize. Resizings show in profiler. Set it higher than 10. For now
2377             // arbitrary 32. TODO: keep record of general size of results being returned.
2378             List<Cell> values = new ArrayList<Cell>(32);
2379             region.startRegionOperation(Operation.SCAN);
2380             try {
2381               int i = 0;
2382               synchronized(scanner) {
2383                 boolean stale = (region.getRegionInfo().getReplicaId() != 0);
2384                 boolean clientHandlesPartials =
2385                     request.hasClientHandlesPartials() && request.getClientHandlesPartials();
2386                 boolean clientHandlesHeartbeats =
2387                     request.hasClientHandlesHeartbeats() && request.getClientHandlesHeartbeats();
2388 
2389                 // On the server side we must ensure that the correct ordering of partial results is
2390                 // returned to the client to allow them to properly reconstruct the partial results.
2391                 // If the coprocessor host is adding to the result list, we cannot guarantee the
2392                 // correct ordering of partial results and so we prevent partial results from being
2393                 // formed.
2394                 boolean serverGuaranteesOrderOfPartials = currentScanResultSize == 0;
2395                 boolean allowPartialResults =
2396                     clientHandlesPartials && serverGuaranteesOrderOfPartials && !isSmallScan;
2397                 boolean moreRows = false;
2398 
2399                 // Heartbeat messages occur when the processing of the ScanRequest is exceeds a
2400                 // certain time threshold on the server. When the time threshold is exceeded, the
2401                 // server stops the scan and sends back whatever Results it has accumulated within
2402                 // that time period (may be empty). Since heartbeat messages have the potential to
2403                 // create partial Results (in the event that the timeout occurs in the middle of a
2404                 // row), we must only generate heartbeat messages when the client can handle both
2405                 // heartbeats AND partials
2406                 boolean allowHeartbeatMessages = clientHandlesHeartbeats && allowPartialResults;
2407 
2408                 // Default value of timeLimit is negative to indicate no timeLimit should be
2409                 // enforced.
2410                 long timeLimit = -1;
2411 
2412                 // Set the time limit to be half of the more restrictive timeout value (one of the
2413                 // timeout values must be positive). In the event that both values are positive, the
2414                 // more restrictive of the two is used to calculate the limit.
2415                 if (allowHeartbeatMessages && (scannerLeaseTimeoutPeriod > 0 || rpcTimeout > 0)) {
2416                   long timeLimitDelta;
2417                   if (scannerLeaseTimeoutPeriod > 0 && rpcTimeout > 0) {
2418                     timeLimitDelta = Math.min(scannerLeaseTimeoutPeriod, rpcTimeout);
2419                   } else {
2420                     timeLimitDelta =
2421                         scannerLeaseTimeoutPeriod > 0 ? scannerLeaseTimeoutPeriod : rpcTimeout;
2422                   }
2423                   // Use half of whichever timeout value was more restrictive... But don't allow
2424                   // the time limit to be less than the allowable minimum (could cause an
2425                   // immediatate timeout before scanning any data).
2426                   timeLimitDelta = Math.max(timeLimitDelta / 2, minimumScanTimeLimitDelta);
2427                   timeLimit = System.currentTimeMillis() + timeLimitDelta;
2428                 }
2429 
2430                 final LimitScope sizeScope =
2431                     allowPartialResults ? LimitScope.BETWEEN_CELLS : LimitScope.BETWEEN_ROWS;
2432                 final LimitScope timeScope =
2433                     allowHeartbeatMessages ? LimitScope.BETWEEN_CELLS : LimitScope.BETWEEN_ROWS;
2434 
2435                 // Configure with limits for this RPC. Set keep progress true since size progress
2436                 // towards size limit should be kept between calls to nextRaw
2437                 ScannerContext.Builder contextBuilder = ScannerContext.newBuilder(true);
2438                 contextBuilder.setSizeLimit(sizeScope, maxResultSize);
2439                 contextBuilder.setBatchLimit(scanner.getBatch());
2440                 contextBuilder.setTimeLimit(timeScope, timeLimit);
2441                 ScannerContext scannerContext = contextBuilder.build();
2442 
2443                 boolean limitReached = false;
2444                 while (i < rows) {
2445                   // Reset the batch progress to 0 before every call to RegionScanner#nextRaw. The
2446                   // batch limit is a limit on the number of cells per Result. Thus, if progress is
2447                   // being tracked (i.e. scannerContext.keepProgress() is true) then we need to
2448                   // reset the batch progress between nextRaw invocations since we don't want the
2449                   // batch progress from previous calls to affect future calls
2450                   scannerContext.setBatchProgress(0);
2451 
2452                   // Collect values to be returned here
2453                   moreRows = scanner.nextRaw(values, scannerContext);
2454 
2455                   if (!values.isEmpty()) {
2456                     for (Cell cell : values) {
2457                       totalCellSize += CellUtil.estimatedSerializedSizeOf(cell);
2458                     }
2459                     final boolean partial = scannerContext.partialResultFormed();
2460                     results.add(Result.create(values, null, stale, partial));
2461                     i++;
2462                   }
2463 
2464                   boolean sizeLimitReached = scannerContext.checkSizeLimit(LimitScope.BETWEEN_ROWS);
2465                   boolean timeLimitReached = scannerContext.checkTimeLimit(LimitScope.BETWEEN_ROWS);
2466                   boolean rowLimitReached = i >= rows;
2467                   limitReached = sizeLimitReached || timeLimitReached || rowLimitReached;
2468 
2469                   if (limitReached || !moreRows) {
2470                     if (LOG.isTraceEnabled()) {
2471                       LOG.trace("Done scanning. limitReached: " + limitReached + " moreRows: "
2472                           + moreRows + " scannerContext: " + scannerContext);
2473                     }
2474                     // We only want to mark a ScanResponse as a heartbeat message in the event that
2475                     // there are more values to be read server side. If there aren't more values,
2476                     // marking it as a heartbeat is wasteful because the client will need to issue
2477                     // another ScanRequest only to realize that they already have all the values
2478                     if (moreRows) {
2479                       // Heartbeat messages occur when the time limit has been reached.
2480                       builder.setHeartbeatMessage(timeLimitReached);
2481                     }
2482                     break;
2483                   }
2484                   values.clear();
2485                 }
2486 
2487                 if (limitReached || moreRows) {
2488                   // We stopped prematurely
2489                   builder.setMoreResultsInRegion(true);
2490                 } else {
2491                   // We didn't get a single batch
2492                   builder.setMoreResultsInRegion(false);
2493                 }
2494               }
2495               region.updateReadRequestsCount(i);
2496               region.getMetrics().updateScanNext(totalCellSize);
2497               if (regionServer.metricsRegionServer != null) {
2498                 regionServer.metricsRegionServer.updateScannerNext(totalCellSize);
2499               }
2500             } finally {
2501               region.closeRegionOperation();
2502             }
2503 
2504             // coprocessor postNext hook
2505             if (region != null && region.getCoprocessorHost() != null) {
2506               region.getCoprocessorHost().postScannerNext(scanner, results, rows, true);
2507             }
2508           }
2509 
2510           quota.addScanResult(results);
2511 
2512           // If the scanner's filter - if any - is done with the scan
2513           // and wants to tell the client to stop the scan. This is done by passing
2514           // a null result, and setting moreResults to false.
2515           if (scanner.isFilterDone() && results.isEmpty()) {
2516             moreResults = false;
2517             results = null;
2518           } else {
2519             addResults(builder, results, controller, RegionReplicaUtil.isDefaultReplica(region.getRegionInfo()));
2520           }
2521         } catch (IOException e) {
2522           // The scanner state might be left in a dirty state, so we will tell the Client to
2523           // fail this RPC and close the scanner while opening up another one from the start of
2524           // row that the client has last seen.
2525           closeScanner(region, scanner, scannerName);
2526 
2527           // We closed the scanner already. Instead of throwing the IOException, and client
2528           // retrying with the same scannerId only to get USE on the next RPC, we directly throw
2529           // a special exception to save an RPC.
2530           RpcCallContext context = RpcServer.getCurrentCall();
2531           if (VersionInfoUtil.hasMinimumVersion(context.getClientVersionInfo(), 1, 4)) {
2532             // 1.4.0+ clients know how to handle
2533             throw new ScannerResetException("Scanner is closed on the server-side", e);
2534           } else {
2535             // older clients do not know about SRE. Just throw USE, which they will handle
2536             throw new UnknownScannerException("Throwing UnknownScannerException to reset the client"
2537                 + " scanner state for clients older than 1.3.", e);
2538           }
2539         } finally {
2540           // We're done. On way out re-add the above removed lease.
2541           // Adding resets expiration time on lease.
2542           if (scanners.containsKey(scannerName)) {
2543             if (lease != null) regionServer.leases.addLease(lease);
2544             ttl = this.scannerLeaseTimeoutPeriod;
2545           }
2546         }
2547       }
2548 
2549       if (!moreResults || closeScanner) {
2550         ttl = 0;
2551         moreResults = false;
2552         closeScanner(region, scanner, scannerName);
2553       }
2554 
2555       if (ttl > 0) {
2556         builder.setTtl(ttl);
2557       }
2558       builder.setScannerId(scannerId);
2559       builder.setMoreResults(moreResults);
2560       return builder.build();
2561     } catch (IOException ie) {
2562       if (scannerName != null && ie instanceof NotServingRegionException) {
2563         RegionScannerHolder rsh = scanners.remove(scannerName);
2564         if (rsh != null) {
2565           try {
2566             RegionScanner scanner = rsh.s;
2567             LOG.warn(scannerName + " encountered " + ie.getMessage() + ", closing ...");
2568             scanner.close();
2569             regionServer.leases.cancelLease(scannerName);
2570           } catch (IOException e) {
2571            LOG.warn("Getting exception closing " + scannerName, e);
2572           }
2573         }
2574       }
2575       throw new ServiceException(ie);
2576     } finally {
2577       if (quota != null) {
2578         quota.close();
2579       }
2580     }
2581   }
2582 
2583   private boolean closeScanner(Region region, RegionScanner scanner, String scannerName)
2584       throws IOException {
2585     if (region != null && region.getCoprocessorHost() != null) {
2586       if (region.getCoprocessorHost().preScannerClose(scanner)) {
2587         return true; // bypass
2588       }
2589     }
2590     RegionScannerHolder rsh = scanners.remove(scannerName);
2591     if (rsh != null) {
2592       scanner = rsh.s;
2593       scanner.close();
2594       try {
2595         regionServer.leases.cancelLease(scannerName);
2596       } catch (LeaseException le) {
2597         // No problem, ignore
2598         if (LOG.isTraceEnabled()) {
2599           LOG.trace("Un-able to cancel lease of scanner. It could already be closed.");
2600         }
2601       }
2602       if (region != null && region.getCoprocessorHost() != null) {
2603         region.getCoprocessorHost().postScannerClose(scanner);
2604       }
2605     }
2606     return false;
2607   }
2608 
2609   @Override
2610   public CoprocessorServiceResponse execRegionServerService(RpcController controller,
2611       CoprocessorServiceRequest request) throws ServiceException {
2612     return regionServer.execRegionServerService(controller, request);
2613   }
2614 
2615   @Override
2616   public UpdateConfigurationResponse updateConfiguration(
2617       RpcController controller, UpdateConfigurationRequest request)
2618       throws ServiceException {
2619     try {
2620       this.regionServer.updateConfiguration();
2621     } catch (Exception e) {
2622       throw new ServiceException(e);
2623     }
2624     return UpdateConfigurationResponse.getDefaultInstance();
2625   }
2626 }