View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.regionserver;
20  
21  import java.io.FileNotFoundException;
22  import java.io.IOException;
23  import java.io.InterruptedIOException;
24  import java.net.BindException;
25  import java.net.InetSocketAddress;
26  import java.net.UnknownHostException;
27  import java.util.ArrayList;
28  import java.util.Collections;
29  import java.util.HashMap;
30  import java.util.Iterator;
31  import java.util.List;
32  import java.util.Map;
33  import java.util.Map.Entry;
34  import java.util.NavigableMap;
35  import java.util.Set;
36  import java.util.TreeSet;
37  import java.util.concurrent.ConcurrentHashMap;
38  import java.util.concurrent.atomic.AtomicLong;
39  
40  import org.apache.commons.logging.Log;
41  import org.apache.commons.logging.LogFactory;
42  import org.apache.hadoop.conf.Configuration;
43  import org.apache.hadoop.hbase.Cell;
44  import org.apache.hadoop.hbase.CellScannable;
45  import org.apache.hadoop.hbase.CellScanner;
46  import org.apache.hadoop.hbase.CellUtil;
47  import org.apache.hadoop.hbase.DoNotRetryIOException;
48  import org.apache.hadoop.hbase.DroppedSnapshotException;
49  import org.apache.hadoop.hbase.HBaseIOException;
50  import org.apache.hadoop.hbase.HConstants;
51  import org.apache.hadoop.hbase.HRegionInfo;
52  import org.apache.hadoop.hbase.HTableDescriptor;
53  import org.apache.hadoop.hbase.MetaTableAccessor;
54  import org.apache.hadoop.hbase.MultiActionResultTooLarge;
55  import org.apache.hadoop.hbase.NotServingRegionException;
56  import org.apache.hadoop.hbase.ServerName;
57  import org.apache.hadoop.hbase.TableName;
58  import org.apache.hadoop.hbase.UnknownScannerException;
59  import org.apache.hadoop.hbase.classification.InterfaceAudience;
60  import org.apache.hadoop.hbase.client.Append;
61  import org.apache.hadoop.hbase.client.ConnectionUtils;
62  import org.apache.hadoop.hbase.client.Delete;
63  import org.apache.hadoop.hbase.client.Durability;
64  import org.apache.hadoop.hbase.client.Get;
65  import org.apache.hadoop.hbase.client.Increment;
66  import org.apache.hadoop.hbase.client.Mutation;
67  import org.apache.hadoop.hbase.client.Put;
68  import org.apache.hadoop.hbase.client.RegionReplicaUtil;
69  import org.apache.hadoop.hbase.client.Result;
70  import org.apache.hadoop.hbase.client.RowMutations;
71  import org.apache.hadoop.hbase.client.Scan;
72  import org.apache.hadoop.hbase.client.VersionInfoUtil;
73  import org.apache.hadoop.hbase.conf.ConfigurationObserver;
74  import org.apache.hadoop.hbase.coordination.CloseRegionCoordination;
75  import org.apache.hadoop.hbase.coordination.OpenRegionCoordination;
76  import org.apache.hadoop.hbase.exceptions.FailedSanityCheckException;
77  import org.apache.hadoop.hbase.exceptions.MergeRegionException;
78  import org.apache.hadoop.hbase.exceptions.OperationConflictException;
79  import org.apache.hadoop.hbase.exceptions.OutOfOrderScannerNextException;
80  import org.apache.hadoop.hbase.exceptions.ScannerResetException;
81  import org.apache.hadoop.hbase.filter.ByteArrayComparable;
82  import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
83  import org.apache.hadoop.hbase.ipc.HBaseRPCErrorHandler;
84  import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
85  import org.apache.hadoop.hbase.ipc.PriorityFunction;
86  import org.apache.hadoop.hbase.ipc.QosPriority;
87  import org.apache.hadoop.hbase.ipc.RpcCallContext;
88  import org.apache.hadoop.hbase.ipc.RpcServer;
89  import org.apache.hadoop.hbase.ipc.RpcServer.BlockingServiceAndInterface;
90  import org.apache.hadoop.hbase.ipc.RpcServerInterface;
91  import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
92  import org.apache.hadoop.hbase.ipc.ServerRpcController;
93  import org.apache.hadoop.hbase.master.MasterRpcServices;
94  import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
95  import org.apache.hadoop.hbase.protobuf.RequestConverter;
96  import org.apache.hadoop.hbase.protobuf.ResponseConverter;
97  import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService;
98  import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.CloseRegionRequest;
99  import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.CloseRegionResponse;
100 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.CompactRegionRequest;
101 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.CompactRegionResponse;
102 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.FlushRegionRequest;
103 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.FlushRegionResponse;
104 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetOnlineRegionRequest;
105 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetOnlineRegionResponse;
106 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetRegionInfoRequest;
107 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetRegionInfoResponse;
108 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetServerInfoRequest;
109 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetServerInfoResponse;
110 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetStoreFileRequest;
111 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetStoreFileResponse;
112 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.MergeRegionsRequest;
113 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.MergeRegionsResponse;
114 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.OpenRegionRequest;
115 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.OpenRegionRequest.RegionOpenInfo;
116 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.OpenRegionResponse;
117 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.OpenRegionResponse.RegionOpeningState;
118 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.ReplicateWALEntryRequest;
119 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.ReplicateWALEntryResponse;
120 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.RollWALWriterRequest;
121 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.RollWALWriterResponse;
122 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.SplitRegionRequest;
123 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.SplitRegionResponse;
124 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.StopServerRequest;
125 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.StopServerResponse;
126 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.UpdateConfigurationRequest;
127 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.UpdateConfigurationResponse;
128 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.UpdateFavoredNodesRequest;
129 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.UpdateFavoredNodesResponse;
130 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.WALEntry;
131 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.WarmupRegionRequest;
132 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.WarmupRegionResponse;
133 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
134 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Action;
135 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.BulkLoadHFileRequest;
136 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.BulkLoadHFileRequest.FamilyPath;
137 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.BulkLoadHFileResponse;
138 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ClientService;
139 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Condition;
140 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceRequest;
141 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceResponse;
142 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.GetRequest;
143 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.GetResponse;
144 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiRequest;
145 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiResponse;
146 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutateRequest;
147 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutateResponse;
148 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto;
149 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto.MutationType;
150 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionAction;
151 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionActionResult;
152 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ResultOrException;
153 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanRequest;
154 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanResponse;
155 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.NameInt64Pair;
156 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionInfo;
157 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier;
158 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType;
159 import org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos.ScanMetrics;
160 import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RequestHeader;
161 import org.apache.hadoop.hbase.protobuf.generated.WALProtos.BulkLoadDescriptor;
162 import org.apache.hadoop.hbase.protobuf.generated.WALProtos.CompactionDescriptor;
163 import org.apache.hadoop.hbase.protobuf.generated.WALProtos.FlushDescriptor;
164 import org.apache.hadoop.hbase.protobuf.generated.WALProtos.RegionEventDescriptor;
165 import org.apache.hadoop.hbase.quotas.OperationQuota;
166 import org.apache.hadoop.hbase.quotas.RegionServerQuotaManager;
167 import org.apache.hadoop.hbase.regionserver.Leases.LeaseStillHeldException;
168 import org.apache.hadoop.hbase.regionserver.Region.FlushResult;
169 import org.apache.hadoop.hbase.regionserver.Region.Operation;
170 import org.apache.hadoop.hbase.regionserver.ScannerContext.LimitScope;
171 import org.apache.hadoop.hbase.regionserver.handler.OpenMetaHandler;
172 import org.apache.hadoop.hbase.regionserver.handler.OpenRegionHandler;
173 import org.apache.hadoop.hbase.wal.WAL;
174 import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
175 import org.apache.hadoop.hbase.security.User;
176 import org.apache.hadoop.hbase.util.Bytes;
177 import org.apache.hadoop.hbase.util.Counter;
178 import org.apache.hadoop.hbase.util.DNS;
179 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
180 import org.apache.hadoop.hbase.util.Pair;
181 import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil;
182 import org.apache.hadoop.hbase.util.Strings;
183 import org.apache.hadoop.hbase.wal.WALKey;
184 import org.apache.hadoop.hbase.wal.WALSplitter;
185 import org.apache.hadoop.hbase.zookeeper.ZKSplitLog;
186 import org.apache.zookeeper.KeeperException;
187 
188 import com.google.common.annotations.VisibleForTesting;
189 import com.google.protobuf.ByteString;
190 import com.google.protobuf.Message;
191 import com.google.protobuf.RpcController;
192 import com.google.protobuf.ServiceException;
193 import com.google.protobuf.TextFormat;
194 
195 /**
196  * Implements the regionserver RPC services.
197  */
198 @InterfaceAudience.Private
199 @SuppressWarnings("deprecation")
200 public class RSRpcServices implements HBaseRPCErrorHandler,
201     AdminService.BlockingInterface, ClientService.BlockingInterface, PriorityFunction,
202     ConfigurationObserver {
203   protected static final Log LOG = LogFactory.getLog(RSRpcServices.class);
204 
205   /** RPC scheduler to use for the region server. */
206   public static final String REGION_SERVER_RPC_SCHEDULER_FACTORY_CLASS =
207     "hbase.region.server.rpc.scheduler.factory.class";
208 
209   /**
210    * Minimum allowable time limit delta (in milliseconds) that can be enforced during scans. This
211    * configuration exists to prevent the scenario where a time limit is specified to be so
212    * restrictive that the time limit is reached immediately (before any cells are scanned).
213    */
214   private static final String REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA =
215       "hbase.region.server.rpc.minimum.scan.time.limit.delta";
216   /**
217    * Default value of {@link RSRpcServices#REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA}
218    */
219   private static final long DEFAULT_REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA = 10;
220 
221   // Request counter. (Includes requests that are not serviced by regions.)
222   final Counter requestCount = new Counter();
223   // Server to handle client requests.
224   final RpcServerInterface rpcServer;
225   final InetSocketAddress isa;
226 
227   private final HRegionServer regionServer;
228   private final long maxScannerResultSize;
229 
230   // The reference to the priority extraction function
231   private final PriorityFunction priority;
232 
233   private final AtomicLong scannerIdGen = new AtomicLong(0L);
234   private final ConcurrentHashMap<String, RegionScannerHolder> scanners =
235     new ConcurrentHashMap<String, RegionScannerHolder>();
236 
237   /**
238    * The lease timeout period for client scanners (milliseconds).
239    */
240   private final int scannerLeaseTimeoutPeriod;
241 
242   /**
243    * The RPC timeout period (milliseconds)
244    */
245   private final int rpcTimeout;
246 
247   /**
248    * The minimum allowable delta to use for the scan limit
249    */
250   private final long minimumScanTimeLimitDelta;
251 
252   /**
253    * Holder class which holds the RegionScanner and nextCallSeq together.
254    */
255   private static class RegionScannerHolder {
256     private AtomicLong nextCallSeq = new AtomicLong(0);
257     private RegionScanner s;
258     private Region r;
259 
260     public RegionScannerHolder(RegionScanner s, Region r) {
261       this.s = s;
262       this.r = r;
263     }
264 
265     private long getNextCallSeq() {
266       return nextCallSeq.get();
267     }
268 
269     private void incNextCallSeq() {
270       nextCallSeq.incrementAndGet();
271     }
272 
273     private void rollbackNextCallSeq() {
274       nextCallSeq.decrementAndGet();
275     }
276   }
277 
278   /**
279    * Instantiated as a scanner lease. If the lease times out, the scanner is
280    * closed
281    */
282   private class ScannerListener implements LeaseListener {
283     private final String scannerName;
284 
285     ScannerListener(final String n) {
286       this.scannerName = n;
287     }
288 
289     @Override
290     public void leaseExpired() {
291       RegionScannerHolder rsh = scanners.remove(this.scannerName);
292       if (rsh != null) {
293         RegionScanner s = rsh.s;
294         LOG.info("Scanner " + this.scannerName + " lease expired on region "
295           + s.getRegionInfo().getRegionNameAsString());
296         Region region = null;
297         try {
298           region = regionServer.getRegion(s.getRegionInfo().getRegionName());
299           if (region != null && region.getCoprocessorHost() != null) {
300             region.getCoprocessorHost().preScannerClose(s);
301           }
302         } catch (IOException e) {
303           LOG.error("Closing scanner for " + s.getRegionInfo().getRegionNameAsString(), e);
304         } finally {
305           try {
306             s.close();
307             if (region != null && region.getCoprocessorHost() != null) {
308               region.getCoprocessorHost().postScannerClose(s);
309             }
310           } catch (IOException e) {
311             LOG.error("Closing scanner for " + s.getRegionInfo().getRegionNameAsString(), e);
312           }
313         }
314       } else {
315         LOG.warn("Scanner " + this.scannerName + " lease expired, but no related" +
316           " scanner found, hence no chance to close that related scanner!");
317       }
318     }
319   }
320 
321   private static ResultOrException getResultOrException(
322       final ClientProtos.Result r, final int index, final ClientProtos.RegionLoadStats stats) {
323     return getResultOrException(ResponseConverter.buildActionResult(r, stats), index);
324   }
325 
326   private static ResultOrException getResultOrException(final Exception e, final int index) {
327     return getResultOrException(ResponseConverter.buildActionResult(e), index);
328   }
329 
330   private static ResultOrException getResultOrException(
331       final ResultOrException.Builder builder, final int index) {
332     return builder.setIndex(index).build();
333   }
334 
335   /**
336    * Starts the nonce operation for a mutation, if needed.
337    * @param mutation Mutation.
338    * @param nonceGroup Nonce group from the request.
339    * @returns Nonce used (can be NO_NONCE).
340    */
341   private long startNonceOperation(final MutationProto mutation, long nonceGroup)
342       throws IOException, OperationConflictException {
343     if (regionServer.nonceManager == null || !mutation.hasNonce()) return HConstants.NO_NONCE;
344     boolean canProceed = false;
345     try {
346       canProceed = regionServer.nonceManager.startOperation(
347         nonceGroup, mutation.getNonce(), regionServer);
348     } catch (InterruptedException ex) {
349       throw new InterruptedIOException("Nonce start operation interrupted");
350     }
351     if (!canProceed) {
352       // TODO: instead, we could convert append/increment to get w/mvcc
353       String message = "The operation with nonce {" + nonceGroup + ", " + mutation.getNonce()
354         + "} on row [" + Bytes.toString(mutation.getRow().toByteArray())
355         + "] may have already completed";
356       throw new OperationConflictException(message);
357     }
358     return mutation.getNonce();
359   }
360 
361   /**
362    * Ends nonce operation for a mutation, if needed.
363    * @param mutation Mutation.
364    * @param nonceGroup Nonce group from the request. Always 0 in initial implementation.
365    * @param success Whether the operation for this nonce has succeeded.
366    */
367   private void endNonceOperation(final MutationProto mutation,
368       long nonceGroup, boolean success) {
369     if (regionServer.nonceManager != null && mutation.hasNonce()) {
370       regionServer.nonceManager.endOperation(nonceGroup, mutation.getNonce(), success);
371     }
372   }
373 
374   /**
375    * @return True if current call supports cellblocks
376    */
377   private boolean isClientCellBlockSupport() {
378     RpcCallContext context = RpcServer.getCurrentCall();
379     return context != null && context.isClientCellBlockSupported();
380   }
381 
382   private void addResult(final MutateResponse.Builder builder,
383       final Result result, final PayloadCarryingRpcController rpcc) {
384     if (result == null) return;
385     if (isClientCellBlockSupport()) {
386       builder.setResult(ProtobufUtil.toResultNoData(result));
387       rpcc.setCellScanner(result.cellScanner());
388     } else {
389       ClientProtos.Result pbr = ProtobufUtil.toResult(result);
390       builder.setResult(pbr);
391     }
392   }
393 
394   private void addResults(final ScanResponse.Builder builder, final List<Result> results,
395       final RpcController controller, boolean isDefaultRegion) {
396     builder.setStale(!isDefaultRegion);
397     if (results == null || results.isEmpty()) return;
398     if (isClientCellBlockSupport()) {
399       for (Result res : results) {
400         builder.addCellsPerResult(res.size());
401         builder.addPartialFlagPerResult(res.isPartial());
402       }
403       ((PayloadCarryingRpcController)controller).
404         setCellScanner(CellUtil.createCellScanner(results));
405     } else {
406       for (Result res: results) {
407         ClientProtos.Result pbr = ProtobufUtil.toResult(res);
408         builder.addResults(pbr);
409       }
410     }
411   }
412 
413   /**
414    * Mutate a list of rows atomically.
415    *
416    * @param region
417    * @param actions
418    * @param cellScanner if non-null, the mutation data -- the Cell content.
419    * @throws IOException
420    */
421   private ClientProtos.RegionLoadStats mutateRows(final Region region,
422       final List<ClientProtos.Action> actions,
423       final CellScanner cellScanner) throws IOException {
424     if (!region.getRegionInfo().isMetaTable()) {
425       regionServer.cacheFlusher.reclaimMemStoreMemory();
426     }
427     RowMutations rm = null;
428     for (ClientProtos.Action action: actions) {
429       if (action.hasGet()) {
430         throw new DoNotRetryIOException("Atomic put and/or delete only, not a Get=" +
431           action.getGet());
432       }
433       MutationType type = action.getMutation().getMutateType();
434       if (rm == null) {
435         rm = new RowMutations(action.getMutation().getRow().toByteArray());
436       }
437       switch (type) {
438         case PUT:
439           rm.add(ProtobufUtil.toPut(action.getMutation(), cellScanner));
440           break;
441         case DELETE:
442           rm.add(ProtobufUtil.toDelete(action.getMutation(), cellScanner));
443           break;
444         default:
445           throw new DoNotRetryIOException("Atomic put and/or delete only, not " + type.name());
446       }
447     }
448     region.mutateRow(rm);
449     return ((HRegion)region).getRegionStats();
450   }
451 
452   /**
453    * Mutate a list of rows atomically.
454    *
455    * @param region
456    * @param actions
457    * @param cellScanner if non-null, the mutation data -- the Cell content.
458    * @param row
459    * @param family
460    * @param qualifier
461    * @param compareOp
462    * @param comparator @throws IOException
463    */
464   private boolean checkAndRowMutate(final Region region, final List<ClientProtos.Action> actions,
465       final CellScanner cellScanner, byte[] row, byte[] family, byte[] qualifier,
466       CompareOp compareOp, ByteArrayComparable comparator) throws IOException {
467     if (!region.getRegionInfo().isMetaTable()) {
468       regionServer.cacheFlusher.reclaimMemStoreMemory();
469     }
470     RowMutations rm = null;
471     for (ClientProtos.Action action: actions) {
472       if (action.hasGet()) {
473         throw new DoNotRetryIOException("Atomic put and/or delete only, not a Get=" +
474             action.getGet());
475       }
476       MutationType type = action.getMutation().getMutateType();
477       if (rm == null) {
478         rm = new RowMutations(action.getMutation().getRow().toByteArray());
479       }
480       switch (type) {
481         case PUT:
482           rm.add(ProtobufUtil.toPut(action.getMutation(), cellScanner));
483           break;
484         case DELETE:
485           rm.add(ProtobufUtil.toDelete(action.getMutation(), cellScanner));
486           break;
487         default:
488           throw new DoNotRetryIOException("Atomic put and/or delete only, not " + type.name());
489       }
490     }
491     return region.checkAndRowMutate(row, family, qualifier, compareOp, comparator, rm, Boolean.TRUE);
492   }
493 
494   /**
495    * Execute an append mutation.
496    *
497    * @param region
498    * @param m
499    * @param cellScanner
500    * @return result to return to client if default operation should be
501    * bypassed as indicated by RegionObserver, null otherwise
502    * @throws IOException
503    */
504   private Result append(final Region region, final OperationQuota quota, final MutationProto m,
505       final CellScanner cellScanner, long nonceGroup) throws IOException {
506     long before = EnvironmentEdgeManager.currentTime();
507     Append append = ProtobufUtil.toAppend(m, cellScanner);
508     quota.addMutation(append);
509     Result r = null;
510     if (region.getCoprocessorHost() != null) {
511       r = region.getCoprocessorHost().preAppend(append);
512     }
513     if (r == null) {
514       long nonce = startNonceOperation(m, nonceGroup);
515       boolean success = false;
516       try {
517         r = region.append(append, nonceGroup, nonce);
518         success = true;
519       } finally {
520         endNonceOperation(m, nonceGroup, success);
521       }
522       if (region.getCoprocessorHost() != null) {
523         region.getCoprocessorHost().postAppend(append, r);
524       }
525     }
526     if (regionServer.metricsRegionServer != null) {
527       regionServer.metricsRegionServer.updateAppend(
528         EnvironmentEdgeManager.currentTime() - before);
529     }
530     return r;
531   }
532 
533   /**
534    * Execute an increment mutation.
535    *
536    * @param region
537    * @param mutation
538    * @return the Result
539    * @throws IOException
540    */
541   private Result increment(final Region region, final OperationQuota quota,
542       final MutationProto mutation, final CellScanner cells, long nonceGroup) throws IOException {
543     long before = EnvironmentEdgeManager.currentTime();
544     Increment increment = ProtobufUtil.toIncrement(mutation, cells);
545     quota.addMutation(increment);
546     Result r = null;
547     if (region.getCoprocessorHost() != null) {
548       r = region.getCoprocessorHost().preIncrement(increment);
549     }
550     if (r == null) {
551       long nonce = startNonceOperation(mutation, nonceGroup);
552       boolean success = false;
553       try {
554         r = region.increment(increment, nonceGroup, nonce);
555         success = true;
556       } finally {
557         endNonceOperation(mutation, nonceGroup, success);
558       }
559       if (region.getCoprocessorHost() != null) {
560         r = region.getCoprocessorHost().postIncrement(increment, r);
561       }
562     }
563     if (regionServer.metricsRegionServer != null) {
564       regionServer.metricsRegionServer.updateIncrement(
565         EnvironmentEdgeManager.currentTime() - before);
566     }
567     return r;
568   }
569 
570   /**
571    * Run through the regionMutation <code>rm</code> and per Mutation, do the work, and then when
572    * done, add an instance of a {@link ResultOrException} that corresponds to each Mutation.
573    * @param region
574    * @param actions
575    * @param cellScanner
576    * @param builder
577    * @param cellsToReturn  Could be null. May be allocated in this method.  This is what this
578    * method returns as a 'result'.
579    * @return Return the <code>cellScanner</code> passed
580    */
581   private List<CellScannable> doNonAtomicRegionMutation(final Region region,
582       final OperationQuota quota, final RegionAction actions, final CellScanner cellScanner,
583       final RegionActionResult.Builder builder, List<CellScannable> cellsToReturn, long nonceGroup) {
584     // Gather up CONTIGUOUS Puts and Deletes in this mutations List.  Idea is that rather than do
585     // one at a time, we instead pass them in batch.  Be aware that the corresponding
586     // ResultOrException instance that matches each Put or Delete is then added down in the
587     // doBatchOp call.  We should be staying aligned though the Put and Delete are deferred/batched
588     List<ClientProtos.Action> mutations = null;
589     long maxQuotaResultSize = Math.min(maxScannerResultSize, quota.getReadAvailable());
590     RpcCallContext context = RpcServer.getCurrentCall();
591     IOException sizeIOE = null;
592     Object lastBlock = null;
593     for (ClientProtos.Action action : actions.getActionList()) {
594       ClientProtos.ResultOrException.Builder resultOrExceptionBuilder = null;
595       try {
596         Result r = null;
597 
598         if (context != null
599             && context.isRetryImmediatelySupported()
600             && (context.getResponseCellSize() > maxQuotaResultSize
601               || context.getResponseBlockSize() > maxQuotaResultSize)) {
602 
603           // We're storing the exception since the exception and reason string won't
604           // change after the response size limit is reached.
605           if (sizeIOE == null ) {
606             // We don't need the stack un-winding do don't throw the exception.
607             // Throwing will kill the JVM's JIT.
608             //
609             // Instead just create the exception and then store it.
610             sizeIOE = new MultiActionResultTooLarge("Max size exceeded"
611                 + " CellSize: " + context.getResponseCellSize()
612                 + " BlockSize: " + context.getResponseBlockSize());
613 
614             // Only report the exception once since there's only one request that
615             // caused the exception. Otherwise this number will dominate the exceptions count.
616             rpcServer.getMetrics().exception(sizeIOE);
617           }
618 
619           // Now that there's an exception is known to be created
620           // use it for the response.
621           //
622           // This will create a copy in the builder.
623           resultOrExceptionBuilder = ResultOrException.newBuilder().
624               setException(ResponseConverter.buildException(sizeIOE));
625           resultOrExceptionBuilder.setIndex(action.getIndex());
626           builder.addResultOrException(resultOrExceptionBuilder.build());
627           if (cellScanner != null) {
628             skipCellsForMutation(action, cellScanner);
629           }
630           continue;
631         }
632         if (action.hasGet()) {
633           long before = EnvironmentEdgeManager.currentTime();
634           try {
635             Get get = ProtobufUtil.toGet(action.getGet());
636             r = region.get(get);
637           } finally {
638             if (regionServer.metricsRegionServer != null) {
639               regionServer.metricsRegionServer.updateGet(
640                 EnvironmentEdgeManager.currentTime() - before);
641             }
642           }
643         } else if (action.hasServiceCall()) {
644           resultOrExceptionBuilder = ResultOrException.newBuilder();
645           try {
646             Message result = execServiceOnRegion(region, action.getServiceCall());
647             ClientProtos.CoprocessorServiceResult.Builder serviceResultBuilder =
648                 ClientProtos.CoprocessorServiceResult.newBuilder();
649             resultOrExceptionBuilder.setServiceResult(
650                 serviceResultBuilder.setValue(
651                   serviceResultBuilder.getValueBuilder()
652                     .setName(result.getClass().getName())
653                     .setValue(result.toByteString())));
654           } catch (IOException ioe) {
655             rpcServer.getMetrics().exception(ioe);
656             resultOrExceptionBuilder.setException(ResponseConverter.buildException(ioe));
657           }
658         } else if (action.hasMutation()) {
659           MutationType type = action.getMutation().getMutateType();
660           if (type != MutationType.PUT && type != MutationType.DELETE && mutations != null &&
661               !mutations.isEmpty()) {
662             // Flush out any Puts or Deletes already collected.
663             doBatchOp(builder, region, quota, mutations, cellScanner);
664             mutations.clear();
665           }
666           switch (type) {
667           case APPEND:
668             r = append(region, quota, action.getMutation(), cellScanner, nonceGroup);
669             break;
670           case INCREMENT:
671             r = increment(region, quota, action.getMutation(), cellScanner,  nonceGroup);
672             break;
673           case PUT:
674           case DELETE:
675             // Collect the individual mutations and apply in a batch
676             if (mutations == null) {
677               mutations = new ArrayList<ClientProtos.Action>(actions.getActionCount());
678             }
679             mutations.add(action);
680             break;
681           default:
682             throw new DoNotRetryIOException("Unsupported mutate type: " + type.name());
683           }
684         } else {
685           throw new HBaseIOException("Unexpected Action type");
686         }
687         if (r != null) {
688           ClientProtos.Result pbResult = null;
689           if (isClientCellBlockSupport()) {
690             pbResult = ProtobufUtil.toResultNoData(r);
691             //  Hard to guess the size here.  Just make a rough guess.
692             if (cellsToReturn == null) {
693               cellsToReturn = new ArrayList<CellScannable>();
694             }
695             cellsToReturn.add(r);
696           } else {
697             pbResult = ProtobufUtil.toResult(r);
698           }
699           lastBlock = addSize(context, r, lastBlock);
700           resultOrExceptionBuilder =
701             ClientProtos.ResultOrException.newBuilder().setResult(pbResult);
702         }
703         // Could get to here and there was no result and no exception.  Presumes we added
704         // a Put or Delete to the collecting Mutations List for adding later.  In this
705         // case the corresponding ResultOrException instance for the Put or Delete will be added
706         // down in the doBatchOp method call rather than up here.
707       } catch (IOException ie) {
708         rpcServer.getMetrics().exception(ie);
709         resultOrExceptionBuilder = ResultOrException.newBuilder().
710           setException(ResponseConverter.buildException(ie));
711       }
712       if (resultOrExceptionBuilder != null) {
713         // Propagate index.
714         resultOrExceptionBuilder.setIndex(action.getIndex());
715         builder.addResultOrException(resultOrExceptionBuilder.build());
716       }
717     }
718     // Finish up any outstanding mutations
719     if (mutations != null && !mutations.isEmpty()) {
720       doBatchOp(builder, region, quota, mutations, cellScanner);
721     }
722     return cellsToReturn;
723   }
724 
725   /**
726    * Execute a list of Put/Delete mutations.
727    *
728    * @param builder
729    * @param region
730    * @param mutations
731    */
732   private void doBatchOp(final RegionActionResult.Builder builder, final Region region,
733       final OperationQuota quota,
734       final List<ClientProtos.Action> mutations, final CellScanner cells) {
735     Mutation[] mArray = new Mutation[mutations.size()];
736     long before = EnvironmentEdgeManager.currentTime();
737     boolean batchContainsPuts = false, batchContainsDelete = false;
738     try {
739       int i = 0;
740       for (ClientProtos.Action action: mutations) {
741         MutationProto m = action.getMutation();
742         Mutation mutation;
743         if (m.getMutateType() == MutationType.PUT) {
744           mutation = ProtobufUtil.toPut(m, cells);
745           batchContainsPuts = true;
746         } else {
747           mutation = ProtobufUtil.toDelete(m, cells);
748           batchContainsDelete = true;
749         }
750         mArray[i++] = mutation;
751         quota.addMutation(mutation);
752       }
753 
754       if (!region.getRegionInfo().isMetaTable()) {
755         regionServer.cacheFlusher.reclaimMemStoreMemory();
756       }
757 
758       OperationStatus[] codes = region.batchMutate(mArray, HConstants.NO_NONCE,
759         HConstants.NO_NONCE);
760       for (i = 0; i < codes.length; i++) {
761         int index = mutations.get(i).getIndex();
762         Exception e = null;
763         switch (codes[i].getOperationStatusCode()) {
764           case BAD_FAMILY:
765             e = new NoSuchColumnFamilyException(codes[i].getExceptionMsg());
766             builder.addResultOrException(getResultOrException(e, index));
767             break;
768 
769           case SANITY_CHECK_FAILURE:
770             e = new FailedSanityCheckException(codes[i].getExceptionMsg());
771             builder.addResultOrException(getResultOrException(e, index));
772             break;
773 
774           default:
775             e = new DoNotRetryIOException(codes[i].getExceptionMsg());
776             builder.addResultOrException(getResultOrException(e, index));
777             break;
778 
779           case SUCCESS:
780             builder.addResultOrException(getResultOrException(
781                 ClientProtos.Result.getDefaultInstance(), index,
782                 ((HRegion) region).getRegionStats()));
783             break;
784         }
785       }
786     } catch (IOException ie) {
787       for (int i = 0; i < mutations.size(); i++) {
788         builder.addResultOrException(getResultOrException(ie, mutations.get(i).getIndex()));
789       }
790     }
791     if (regionServer.metricsRegionServer != null) {
792       long after = EnvironmentEdgeManager.currentTime();
793       if (batchContainsPuts) {
794         regionServer.metricsRegionServer.updatePut(after - before);
795       }
796       if (batchContainsDelete) {
797         regionServer.metricsRegionServer.updateDelete(after - before);
798       }
799     }
800   }
801 
802   /**
803    * Execute a list of Put/Delete mutations. The function returns OperationStatus instead of
804    * constructing MultiResponse to save a possible loop if caller doesn't need MultiResponse.
805    * @param region
806    * @param mutations
807    * @param replaySeqId
808    * @return an array of OperationStatus which internally contains the OperationStatusCode and the
809    *         exceptionMessage if any
810    * @throws IOException
811    */
812   private OperationStatus [] doReplayBatchOp(final Region region,
813       final List<WALSplitter.MutationReplay> mutations, long replaySeqId) throws IOException {
814     long before = EnvironmentEdgeManager.currentTime();
815     boolean batchContainsPuts = false, batchContainsDelete = false;
816     try {
817       for (Iterator<WALSplitter.MutationReplay> it = mutations.iterator(); it.hasNext();) {
818         WALSplitter.MutationReplay m = it.next();
819 
820         if (m.type == MutationType.PUT) {
821           batchContainsPuts = true;
822         } else {
823           batchContainsDelete = true;
824         }
825 
826         NavigableMap<byte[], List<Cell>> map = m.mutation.getFamilyCellMap();
827         List<Cell> metaCells = map.get(WALEdit.METAFAMILY);
828         if (metaCells != null && !metaCells.isEmpty()) {
829           for (Cell metaCell : metaCells) {
830             CompactionDescriptor compactionDesc = WALEdit.getCompaction(metaCell);
831             boolean isDefaultReplica = RegionReplicaUtil.isDefaultReplica(region.getRegionInfo());
832             HRegion hRegion = (HRegion)region;
833             if (compactionDesc != null) {
834               // replay the compaction. Remove the files from stores only if we are the primary
835               // region replica (thus own the files)
836               hRegion.replayWALCompactionMarker(compactionDesc, !isDefaultReplica, isDefaultReplica,
837                 replaySeqId);
838               continue;
839             }
840             FlushDescriptor flushDesc = WALEdit.getFlushDescriptor(metaCell);
841             if (flushDesc != null && !isDefaultReplica) {
842               hRegion.replayWALFlushMarker(flushDesc, replaySeqId);
843               continue;
844             }
845             RegionEventDescriptor regionEvent = WALEdit.getRegionEventDescriptor(metaCell);
846             if (regionEvent != null && !isDefaultReplica) {
847               hRegion.replayWALRegionEventMarker(regionEvent);
848               continue;
849             }
850             BulkLoadDescriptor bulkLoadEvent = WALEdit.getBulkLoadDescriptor(metaCell);
851             if (bulkLoadEvent != null) {
852               hRegion.replayWALBulkLoadEventMarker(bulkLoadEvent);
853               continue;
854             }
855           }
856           it.remove();
857         }
858       }
859       requestCount.add(mutations.size());
860       if (!region.getRegionInfo().isMetaTable()) {
861         regionServer.cacheFlusher.reclaimMemStoreMemory();
862       }
863       return region.batchReplay(mutations.toArray(
864         new WALSplitter.MutationReplay[mutations.size()]), replaySeqId);
865     } finally {
866       if (regionServer.metricsRegionServer != null) {
867         long after = EnvironmentEdgeManager.currentTime();
868           if (batchContainsPuts) {
869           regionServer.metricsRegionServer.updatePut(after - before);
870         }
871         if (batchContainsDelete) {
872           regionServer.metricsRegionServer.updateDelete(after - before);
873         }
874       }
875     }
876   }
877 
878   private void closeAllScanners() {
879     // Close any outstanding scanners. Means they'll get an UnknownScanner
880     // exception next time they come in.
881     for (Map.Entry<String, RegionScannerHolder> e : scanners.entrySet()) {
882       try {
883         e.getValue().s.close();
884       } catch (IOException ioe) {
885         LOG.warn("Closing scanner " + e.getKey(), ioe);
886       }
887     }
888   }
889 
890   public RSRpcServices(HRegionServer rs) throws IOException {
891     regionServer = rs;
892 
893     RpcSchedulerFactory rpcSchedulerFactory;
894     try {
895       Class<?> rpcSchedulerFactoryClass = rs.conf.getClass(
896           REGION_SERVER_RPC_SCHEDULER_FACTORY_CLASS,
897           SimpleRpcSchedulerFactory.class);
898       rpcSchedulerFactory = ((RpcSchedulerFactory) rpcSchedulerFactoryClass.newInstance());
899     } catch (InstantiationException e) {
900       throw new IllegalArgumentException(e);
901     } catch (IllegalAccessException e) {
902       throw new IllegalArgumentException(e);
903     }
904     // Server to handle client requests.
905     InetSocketAddress initialIsa;
906     InetSocketAddress bindAddress;
907     if(this instanceof MasterRpcServices) {
908       String hostname = getHostname(rs.conf, true);
909       int port = rs.conf.getInt(HConstants.MASTER_PORT, HConstants.DEFAULT_MASTER_PORT);
910       // Creation of a HSA will force a resolve.
911       initialIsa = new InetSocketAddress(hostname, port);
912       bindAddress = new InetSocketAddress(rs.conf.get("hbase.master.ipc.address", hostname), port);
913     } else {
914       String hostname = getHostname(rs.conf, false);
915       int port = rs.conf.getInt(HConstants.REGIONSERVER_PORT,
916         HConstants.DEFAULT_REGIONSERVER_PORT);
917       // Creation of a HSA will force a resolve.
918       initialIsa = new InetSocketAddress(hostname, port);
919       bindAddress = new InetSocketAddress(
920         rs.conf.get("hbase.regionserver.ipc.address", hostname), port);
921     }
922     if (initialIsa.getAddress() == null) {
923       throw new IllegalArgumentException("Failed resolve of " + initialIsa);
924     }
925     priority = createPriority();
926     String name = rs.getProcessName() + "/" + initialIsa.toString();
927     // Set how many times to retry talking to another server over HConnection.
928     ConnectionUtils.setServerSideHConnectionRetriesConfig(rs.conf, name, LOG);
929     try {
930       rpcServer = new RpcServer(rs, name, getServices(),
931           bindAddress, // use final bindAddress for this server.
932           rs.conf,
933           rpcSchedulerFactory.create(rs.conf, this, rs));
934       rpcServer.setRsRpcServices(this);
935     } catch (BindException be) {
936       String configName = (this instanceof MasterRpcServices) ? HConstants.MASTER_PORT :
937           HConstants.REGIONSERVER_PORT;
938       throw new IOException(be.getMessage() + ". To switch ports use the '" + configName +
939           "' configuration property.", be.getCause() != null ? be.getCause() : be);
940     }
941 
942     scannerLeaseTimeoutPeriod = rs.conf.getInt(
943       HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD,
944       HConstants.DEFAULT_HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD);
945     maxScannerResultSize = rs.conf.getLong(
946       HConstants.HBASE_SERVER_SCANNER_MAX_RESULT_SIZE_KEY,
947       HConstants.DEFAULT_HBASE_SERVER_SCANNER_MAX_RESULT_SIZE);
948     rpcTimeout = rs.conf.getInt(
949       HConstants.HBASE_RPC_TIMEOUT_KEY,
950       HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
951     minimumScanTimeLimitDelta = rs.conf.getLong(
952       REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA,
953       DEFAULT_REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA);
954 
955     InetSocketAddress address = rpcServer.getListenerAddress();
956     if (address == null) {
957       throw new IOException("Listener channel is closed");
958     }
959     // Set our address, however we need the final port that was given to rpcServer
960     isa = new InetSocketAddress(initialIsa.getHostName(), address.getPort());
961     rpcServer.setErrorHandler(this);
962     rs.setName(name);
963   }
964 
965   @Override
966   public void onConfigurationChange(Configuration newConf) {
967     if (rpcServer instanceof ConfigurationObserver) {
968       ((ConfigurationObserver)rpcServer).onConfigurationChange(newConf);
969     }
970   }
971 
972   protected PriorityFunction createPriority() {
973     return new AnnotationReadingPriorityFunction(this);
974   }
975 
976   public static String getHostname(Configuration conf, boolean isMaster)
977       throws UnknownHostException {
978     String hostname = conf.get(isMaster? HRegionServer.MASTER_HOSTNAME_KEY :
979       HRegionServer.RS_HOSTNAME_KEY);
980     if (hostname == null || hostname.isEmpty()) {
981       String masterOrRS = isMaster ? "master" : "regionserver";
982       return Strings.domainNamePointerToHostName(DNS.getDefaultHost(
983         conf.get("hbase." + masterOrRS + ".dns.interface", "default"),
984         conf.get("hbase." + masterOrRS + ".dns.nameserver", "default")));
985     } else {
986       LOG.info("hostname is configured to be " + hostname);
987       return hostname;
988     }
989   }
990 
991   RegionScanner getScanner(long scannerId) {
992     String scannerIdString = Long.toString(scannerId);
993     RegionScannerHolder scannerHolder = scanners.get(scannerIdString);
994     if (scannerHolder != null) {
995       return scannerHolder.s;
996     }
997     return null;
998   }
999 
1000   public String getScanDetailsWithId(long scannerId) {
1001     RegionScanner scanner = getScanner(scannerId);
1002     if (scanner == null) {
1003       return null;
1004     }
1005     StringBuilder builder = new StringBuilder();
1006     builder.append("table: ").append(scanner.getRegionInfo().getTable().getNameAsString());
1007     builder.append(" region: ").append(scanner.getRegionInfo().getRegionNameAsString());
1008     return builder.toString();
1009   }
1010 
1011   /**
1012    * Get the vtime associated with the scanner.
1013    * Currently the vtime is the number of "next" calls.
1014    */
1015   long getScannerVirtualTime(long scannerId) {
1016     String scannerIdString = Long.toString(scannerId);
1017     RegionScannerHolder scannerHolder = scanners.get(scannerIdString);
1018     if (scannerHolder != null) {
1019       return scannerHolder.getNextCallSeq();
1020     }
1021     return 0L;
1022   }
1023 
1024   long addScanner(RegionScanner s, Region r) throws LeaseStillHeldException {
1025     long scannerId = this.scannerIdGen.incrementAndGet();
1026     String scannerName = String.valueOf(scannerId);
1027 
1028     RegionScannerHolder existing =
1029       scanners.putIfAbsent(scannerName, new RegionScannerHolder(s, r));
1030     assert existing == null : "scannerId must be unique within regionserver's whole lifecycle!";
1031 
1032     regionServer.leases.createLease(scannerName, this.scannerLeaseTimeoutPeriod,
1033         new ScannerListener(scannerName));
1034     return scannerId;
1035   }
1036 
1037   /**
1038    * Method to account for the size of retained cells and retained data blocks.
1039    * @return an object that represents the last referenced block from this response.
1040    */
1041   Object addSize(RpcCallContext context, Result r, Object lastBlock) {
1042     if (context != null && !r.isEmpty()) {
1043       for (Cell c : r.rawCells()) {
1044         context.incrementResponseCellSize(CellUtil.estimatedHeapSizeOf(c));
1045         // We're using the last block being the same as the current block as
1046         // a proxy for pointing to a new block. This won't be exact.
1047         // If there are multiple gets that bounce back and forth
1048         // Then it's possible that this will over count the size of
1049         // referenced blocks. However it's better to over count and
1050         // use two RPC's than to OOME the RegionServer.
1051         byte[] valueArray = c.getValueArray();
1052         if (valueArray != lastBlock) {
1053           context.incrementResponseBlockSize(valueArray.length);
1054           lastBlock = valueArray;
1055         }
1056       }
1057     }
1058     return lastBlock;
1059   }
1060 
1061 
1062   /**
1063    * Find the HRegion based on a region specifier
1064    *
1065    * @param regionSpecifier the region specifier
1066    * @return the corresponding region
1067    * @throws IOException if the specifier is not null,
1068    *    but failed to find the region
1069    */
1070   Region getRegion(
1071       final RegionSpecifier regionSpecifier) throws IOException {
1072     return regionServer.getRegionByEncodedName(regionSpecifier.getValue().toByteArray(),
1073         ProtobufUtil.getRegionEncodedName(regionSpecifier));
1074   }
1075 
1076   @VisibleForTesting
1077   public PriorityFunction getPriority() {
1078     return priority;
1079   }
1080 
1081   @VisibleForTesting
1082   public Configuration getConfiguration() {
1083     return regionServer.getConfiguration();
1084   }
1085 
1086   private RegionServerQuotaManager getQuotaManager() {
1087     return regionServer.getRegionServerQuotaManager();
1088   }
1089 
1090   void start() {
1091     rpcServer.start();
1092   }
1093 
1094   void stop() {
1095     closeAllScanners();
1096     rpcServer.stop();
1097   }
1098 
1099   /**
1100    * Called to verify that this server is up and running.
1101    *
1102    * @throws IOException
1103    */
1104   protected void checkOpen() throws IOException {
1105     if (regionServer.isAborted()) {
1106       throw new RegionServerAbortedException("Server " + regionServer.serverName + " aborting");
1107     }
1108     if (regionServer.isStopped()) {
1109       throw new RegionServerStoppedException("Server " + regionServer.serverName + " stopping");
1110     }
1111     if (!regionServer.fsOk) {
1112       throw new RegionServerStoppedException("File system not available");
1113     }
1114     if (!regionServer.isOnline()) {
1115       throw new ServerNotRunningYetException("Server is not running yet");
1116     }
1117   }
1118 
1119   /**
1120    * @return list of blocking services and their security info classes that this server supports
1121    */
1122   protected List<BlockingServiceAndInterface> getServices() {
1123     List<BlockingServiceAndInterface> bssi = new ArrayList<BlockingServiceAndInterface>(2);
1124     bssi.add(new BlockingServiceAndInterface(
1125       ClientService.newReflectiveBlockingService(this),
1126       ClientService.BlockingInterface.class));
1127     bssi.add(new BlockingServiceAndInterface(
1128       AdminService.newReflectiveBlockingService(this),
1129       AdminService.BlockingInterface.class));
1130     return bssi;
1131   }
1132 
1133   public InetSocketAddress getSocketAddress() {
1134     return isa;
1135   }
1136 
1137   @Override
1138   public int getPriority(RequestHeader header, Message param, User user) {
1139     return priority.getPriority(header, param, user);
1140   }
1141 
1142   @Override
1143   public long getDeadline(RequestHeader header, Message param) {
1144     return priority.getDeadline(header, param);
1145   }
1146 
1147   /*
1148    * Check if an OOME and, if so, abort immediately to avoid creating more objects.
1149    *
1150    * @param e
1151    *
1152    * @return True if we OOME'd and are aborting.
1153    */
1154   @Override
1155   public boolean checkOOME(final Throwable e) {
1156     return exitIfOOME(e);
1157   }
1158 
1159   public static boolean exitIfOOME(final Throwable e ){
1160     boolean stop = false;
1161     try {
1162       if (e instanceof OutOfMemoryError
1163           || (e.getCause() != null && e.getCause() instanceof OutOfMemoryError)
1164           || (e.getMessage() != null && e.getMessage().contains(
1165               "java.lang.OutOfMemoryError"))) {
1166         stop = true;
1167         LOG.fatal("Run out of memory; " + RSRpcServices.class.getSimpleName()
1168           + " will abort itself immediately", e);
1169       }
1170     } finally {
1171       if (stop) {
1172         Runtime.getRuntime().halt(1);
1173       }
1174     }
1175     return stop;
1176   }
1177 
1178   /**
1179    * Close a region on the region server.
1180    *
1181    * @param controller the RPC controller
1182    * @param request the request
1183    * @throws ServiceException
1184    */
1185   @Override
1186   @QosPriority(priority=HConstants.ADMIN_QOS)
1187   public CloseRegionResponse closeRegion(final RpcController controller,
1188       final CloseRegionRequest request) throws ServiceException {
1189     final ServerName sn = (request.hasDestinationServer() ?
1190       ProtobufUtil.toServerName(request.getDestinationServer()) : null);
1191 
1192     try {
1193       checkOpen();
1194       if (request.hasServerStartCode()) {
1195         // check that we are the same server that this RPC is intended for.
1196         long serverStartCode = request.getServerStartCode();
1197         if (regionServer.serverName.getStartcode() !=  serverStartCode) {
1198           throw new ServiceException(new DoNotRetryIOException("This RPC was intended for a " +
1199               "different server with startCode: " + serverStartCode + ", this server is: "
1200               + regionServer.serverName));
1201         }
1202       }
1203       final String encodedRegionName = ProtobufUtil.getRegionEncodedName(request.getRegion());
1204 
1205       requestCount.increment();
1206       LOG.info("Close " + encodedRegionName + ", moving to " + sn);
1207       CloseRegionCoordination.CloseRegionDetails crd = regionServer.getCoordinatedStateManager()
1208         .getCloseRegionCoordination().parseFromProtoRequest(request);
1209 
1210       boolean closed = regionServer.closeRegion(encodedRegionName, false, crd, sn);
1211       CloseRegionResponse.Builder builder = CloseRegionResponse.newBuilder().setClosed(closed);
1212       return builder.build();
1213     } catch (IOException ie) {
1214       throw new ServiceException(ie);
1215     }
1216   }
1217 
1218   /**
1219    * Compact a region on the region server.
1220    *
1221    * @param controller the RPC controller
1222    * @param request the request
1223    * @throws ServiceException
1224    */
1225   @Override
1226   @QosPriority(priority=HConstants.ADMIN_QOS)
1227   public CompactRegionResponse compactRegion(final RpcController controller,
1228       final CompactRegionRequest request) throws ServiceException {
1229     try {
1230       checkOpen();
1231       requestCount.increment();
1232       Region region = getRegion(request.getRegion());
1233       region.startRegionOperation(Operation.COMPACT_REGION);
1234       LOG.info("Compacting " + region.getRegionInfo().getRegionNameAsString());
1235       boolean major = false;
1236       byte [] family = null;
1237       Store store = null;
1238       if (request.hasFamily()) {
1239         family = request.getFamily().toByteArray();
1240         store = region.getStore(family);
1241         if (store == null) {
1242           throw new ServiceException(new IOException("column family " + Bytes.toString(family)
1243             + " does not exist in region " + region.getRegionInfo().getRegionNameAsString()));
1244         }
1245       }
1246       if (request.hasMajor()) {
1247         major = request.getMajor();
1248       }
1249       if (major) {
1250         if (family != null) {
1251           store.triggerMajorCompaction();
1252         } else {
1253           region.triggerMajorCompaction();
1254         }
1255       }
1256 
1257       String familyLogMsg = (family != null)?" for column family: " + Bytes.toString(family):"";
1258       if (LOG.isTraceEnabled()) {
1259         LOG.trace("User-triggered compaction requested for region "
1260           + region.getRegionInfo().getRegionNameAsString() + familyLogMsg);
1261       }
1262       String log = "User-triggered " + (major ? "major " : "") + "compaction" + familyLogMsg;
1263       if(family != null) {
1264         regionServer.compactSplitThread.requestCompaction(region, store, log,
1265           Store.PRIORITY_USER, null, RpcServer.getRequestUser());
1266       } else {
1267         regionServer.compactSplitThread.requestCompaction(region, log,
1268           Store.PRIORITY_USER, null, RpcServer.getRequestUser());
1269       }
1270       return CompactRegionResponse.newBuilder().build();
1271     } catch (IOException ie) {
1272       throw new ServiceException(ie);
1273     }
1274   }
1275 
1276   /**
1277    * Flush a region on the region server.
1278    *
1279    * @param controller the RPC controller
1280    * @param request the request
1281    * @throws ServiceException
1282    */
1283   @Override
1284   @QosPriority(priority=HConstants.ADMIN_QOS)
1285   public FlushRegionResponse flushRegion(final RpcController controller,
1286       final FlushRegionRequest request) throws ServiceException {
1287     try {
1288       checkOpen();
1289       requestCount.increment();
1290       Region region = getRegion(request.getRegion());
1291       LOG.info("Flushing " + region.getRegionInfo().getRegionNameAsString());
1292       boolean shouldFlush = true;
1293       if (request.hasIfOlderThanTs()) {
1294         shouldFlush = region.getEarliestFlushTimeForAllStores() < request.getIfOlderThanTs();
1295       }
1296       FlushRegionResponse.Builder builder = FlushRegionResponse.newBuilder();
1297       if (shouldFlush) {
1298         boolean writeFlushWalMarker =  request.hasWriteFlushWalMarker() ?
1299             request.getWriteFlushWalMarker() : false;
1300         long startTime = EnvironmentEdgeManager.currentTime();
1301         // Go behind the curtain so we can manage writing of the flush WAL marker
1302         HRegion.FlushResultImpl flushResult = (HRegion.FlushResultImpl)
1303             ((HRegion)region).flushcache(true, writeFlushWalMarker);
1304         if (flushResult.isFlushSucceeded()) {
1305           long endTime = EnvironmentEdgeManager.currentTime();
1306           regionServer.metricsRegionServer.updateFlushTime(endTime - startTime);
1307         }
1308         boolean compactionNeeded = flushResult.isCompactionNeeded();
1309         if (compactionNeeded) {
1310           regionServer.compactSplitThread.requestSystemCompaction(region,
1311             "Compaction through user triggered flush");
1312         }
1313         builder.setFlushed(flushResult.isFlushSucceeded());
1314         builder.setWroteFlushWalMarker(flushResult.wroteFlushWalMarker);
1315       }
1316       builder.setLastFlushTime(region.getEarliestFlushTimeForAllStores());
1317       return builder.build();
1318     } catch (DroppedSnapshotException ex) {
1319       // Cache flush can fail in a few places. If it fails in a critical
1320       // section, we get a DroppedSnapshotException and a replay of wal
1321       // is required. Currently the only way to do this is a restart of
1322       // the server.
1323       regionServer.abort("Replay of WAL required. Forcing server shutdown", ex);
1324       throw new ServiceException(ex);
1325     } catch (IOException ie) {
1326       throw new ServiceException(ie);
1327     }
1328   }
1329 
1330   @Override
1331   @QosPriority(priority=HConstants.ADMIN_QOS)
1332   public GetOnlineRegionResponse getOnlineRegion(final RpcController controller,
1333       final GetOnlineRegionRequest request) throws ServiceException {
1334     try {
1335       checkOpen();
1336       requestCount.increment();
1337       Map<String, Region> onlineRegions = regionServer.onlineRegions;
1338       List<HRegionInfo> list = new ArrayList<HRegionInfo>(onlineRegions.size());
1339       for (Region region: onlineRegions.values()) {
1340         list.add(region.getRegionInfo());
1341       }
1342       Collections.sort(list);
1343       return ResponseConverter.buildGetOnlineRegionResponse(list);
1344     } catch (IOException ie) {
1345       throw new ServiceException(ie);
1346     }
1347   }
1348 
1349   @Override
1350   @QosPriority(priority=HConstants.ADMIN_QOS)
1351   public GetRegionInfoResponse getRegionInfo(final RpcController controller,
1352       final GetRegionInfoRequest request) throws ServiceException {
1353     try {
1354       checkOpen();
1355       requestCount.increment();
1356       Region region = getRegion(request.getRegion());
1357       HRegionInfo info = region.getRegionInfo();
1358       GetRegionInfoResponse.Builder builder = GetRegionInfoResponse.newBuilder();
1359       builder.setRegionInfo(HRegionInfo.convert(info));
1360       if (request.hasCompactionState() && request.getCompactionState()) {
1361         builder.setCompactionState(region.getCompactionState());
1362       }
1363       builder.setIsRecovering(region.isRecovering());
1364       return builder.build();
1365     } catch (IOException ie) {
1366       throw new ServiceException(ie);
1367     }
1368   }
1369 
1370   /**
1371    * Get some information of the region server.
1372    *
1373    * @param controller the RPC controller
1374    * @param request the request
1375    * @throws ServiceException
1376    */
1377   @Override
1378   @QosPriority(priority=HConstants.ADMIN_QOS)
1379   public GetServerInfoResponse getServerInfo(final RpcController controller,
1380       final GetServerInfoRequest request) throws ServiceException {
1381     try {
1382       checkOpen();
1383     } catch (IOException ie) {
1384       throw new ServiceException(ie);
1385     }
1386     requestCount.increment();
1387     int infoPort = regionServer.infoServer != null ? regionServer.infoServer.getPort() : -1;
1388     return ResponseConverter.buildGetServerInfoResponse(regionServer.serverName, infoPort);
1389   }
1390 
1391   @Override
1392   @QosPriority(priority=HConstants.ADMIN_QOS)
1393   public GetStoreFileResponse getStoreFile(final RpcController controller,
1394       final GetStoreFileRequest request) throws ServiceException {
1395     try {
1396       checkOpen();
1397       Region region = getRegion(request.getRegion());
1398       requestCount.increment();
1399       Set<byte[]> columnFamilies;
1400       if (request.getFamilyCount() == 0) {
1401         columnFamilies = region.getTableDesc().getFamiliesKeys();
1402       } else {
1403         columnFamilies = new TreeSet<byte[]>(Bytes.BYTES_RAWCOMPARATOR);
1404         for (ByteString cf: request.getFamilyList()) {
1405           columnFamilies.add(cf.toByteArray());
1406         }
1407       }
1408       int nCF = columnFamilies.size();
1409       List<String>  fileList = region.getStoreFileList(
1410         columnFamilies.toArray(new byte[nCF][]));
1411       GetStoreFileResponse.Builder builder = GetStoreFileResponse.newBuilder();
1412       builder.addAllStoreFile(fileList);
1413       return builder.build();
1414     } catch (IOException ie) {
1415       throw new ServiceException(ie);
1416     }
1417   }
1418 
1419   /**
1420    * Merge regions on the region server.
1421    *
1422    * @param controller the RPC controller
1423    * @param request the request
1424    * @return merge regions response
1425    * @throws ServiceException
1426    */
1427   @Override
1428   @QosPriority(priority = HConstants.ADMIN_QOS)
1429   public MergeRegionsResponse mergeRegions(final RpcController controller,
1430       final MergeRegionsRequest request) throws ServiceException {
1431     try {
1432       checkOpen();
1433       requestCount.increment();
1434       Region regionA = getRegion(request.getRegionA());
1435       Region regionB = getRegion(request.getRegionB());
1436       boolean forcible = request.getForcible();
1437       long masterSystemTime = request.hasMasterSystemTime() ? request.getMasterSystemTime() : -1;
1438       regionA.startRegionOperation(Operation.MERGE_REGION);
1439       regionB.startRegionOperation(Operation.MERGE_REGION);
1440       if (regionA.getRegionInfo().getReplicaId() != HRegionInfo.DEFAULT_REPLICA_ID ||
1441           regionB.getRegionInfo().getReplicaId() != HRegionInfo.DEFAULT_REPLICA_ID) {
1442         throw new ServiceException(new MergeRegionException("Can't merge non-default replicas"));
1443       }
1444       LOG.info("Receiving merging request for  " + regionA + ", " + regionB
1445           + ",forcible=" + forcible);
1446       long startTime = EnvironmentEdgeManager.currentTime();
1447       FlushResult flushResult = regionA.flush(true);
1448       if (flushResult.isFlushSucceeded()) {
1449         long endTime = EnvironmentEdgeManager.currentTime();
1450         regionServer.metricsRegionServer.updateFlushTime(endTime - startTime);
1451       }
1452       startTime = EnvironmentEdgeManager.currentTime();
1453       flushResult = regionB.flush(true);
1454       if (flushResult.isFlushSucceeded()) {
1455         long endTime = EnvironmentEdgeManager.currentTime();
1456         regionServer.metricsRegionServer.updateFlushTime(endTime - startTime);
1457       }
1458       regionServer.compactSplitThread.requestRegionsMerge(regionA, regionB, forcible,
1459           masterSystemTime, RpcServer.getRequestUser());
1460       return MergeRegionsResponse.newBuilder().build();
1461     } catch (DroppedSnapshotException ex) {
1462       regionServer.abort("Replay of WAL required. Forcing server shutdown", ex);
1463       throw new ServiceException(ex);
1464     } catch (IOException ie) {
1465       throw new ServiceException(ie);
1466     }
1467   }
1468 
1469   /**
1470    * Open asynchronously a region or a set of regions on the region server.
1471    *
1472    * The opening is coordinated by ZooKeeper, and this method requires the znode to be created
1473    *  before being called. As a consequence, this method should be called only from the master.
1474    * <p>
1475    * Different manages states for the region are:
1476    * </p><ul>
1477    *  <li>region not opened: the region opening will start asynchronously.</li>
1478    *  <li>a close is already in progress: this is considered as an error.</li>
1479    *  <li>an open is already in progress: this new open request will be ignored. This is important
1480    *  because the Master can do multiple requests if it crashes.</li>
1481    *  <li>the region is already opened:  this new open request will be ignored.</li>
1482    *  </ul>
1483    * <p>
1484    * Bulk assign: If there are more than 1 region to open, it will be considered as a bulk assign.
1485    * For a single region opening, errors are sent through a ServiceException. For bulk assign,
1486    * errors are put in the response as FAILED_OPENING.
1487    * </p>
1488    * @param controller the RPC controller
1489    * @param request the request
1490    * @throws ServiceException
1491    */
1492   @Override
1493   @QosPriority(priority=HConstants.ADMIN_QOS)
1494   public OpenRegionResponse openRegion(final RpcController controller,
1495       final OpenRegionRequest request) throws ServiceException {
1496     requestCount.increment();
1497     if (request.hasServerStartCode()) {
1498       // check that we are the same server that this RPC is intended for.
1499       long serverStartCode = request.getServerStartCode();
1500       if (regionServer.serverName.getStartcode() !=  serverStartCode) {
1501         throw new ServiceException(new DoNotRetryIOException("This RPC was intended for a " +
1502             "different server with startCode: " + serverStartCode + ", this server is: "
1503             + regionServer.serverName));
1504       }
1505     }
1506 
1507     OpenRegionResponse.Builder builder = OpenRegionResponse.newBuilder();
1508     final int regionCount = request.getOpenInfoCount();
1509     final Map<TableName, HTableDescriptor> htds =
1510         new HashMap<TableName, HTableDescriptor>(regionCount);
1511     final boolean isBulkAssign = regionCount > 1;
1512     try {
1513       checkOpen();
1514     } catch (IOException ie) {
1515       TableName tableName = null;
1516       if (regionCount == 1) {
1517         RegionInfo ri = request.getOpenInfo(0).getRegion();
1518         if (ri != null) {
1519           tableName = ProtobufUtil.toTableName(ri.getTableName());
1520         }
1521       }
1522       if (!TableName.META_TABLE_NAME.equals(tableName)) {
1523         throw new ServiceException(ie);
1524       }
1525       // We are assigning meta, wait a little for regionserver to finish initialization.
1526       int timeout = regionServer.conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY,
1527         HConstants.DEFAULT_HBASE_RPC_TIMEOUT) >> 2; // Quarter of RPC timeout
1528       long endTime = System.currentTimeMillis() + timeout;
1529       synchronized (regionServer.online) {
1530         try {
1531           while (System.currentTimeMillis() <= endTime
1532               && !regionServer.isStopped() && !regionServer.isOnline()) {
1533             regionServer.online.wait(regionServer.msgInterval);
1534           }
1535           checkOpen();
1536         } catch (InterruptedException t) {
1537           Thread.currentThread().interrupt();
1538           throw new ServiceException(t);
1539         } catch (IOException e) {
1540           throw new ServiceException(e);
1541         }
1542       }
1543     }
1544 
1545     long masterSystemTime = request.hasMasterSystemTime() ? request.getMasterSystemTime() : -1;
1546 
1547     for (RegionOpenInfo regionOpenInfo : request.getOpenInfoList()) {
1548       final HRegionInfo region = HRegionInfo.convert(regionOpenInfo.getRegion());
1549       OpenRegionCoordination coordination = regionServer.getCoordinatedStateManager().
1550         getOpenRegionCoordination();
1551       OpenRegionCoordination.OpenRegionDetails ord =
1552         coordination.parseFromProtoRequest(regionOpenInfo);
1553 
1554       HTableDescriptor htd;
1555       try {
1556         final Region onlineRegion = regionServer.getFromOnlineRegions(region.getEncodedName());
1557         if (onlineRegion != null) {
1558           //Check if the region can actually be opened.
1559           if (onlineRegion.getCoprocessorHost() != null) {
1560             onlineRegion.getCoprocessorHost().preOpen();
1561           }
1562           // See HBASE-5094. Cross check with hbase:meta if still this RS is owning
1563           // the region.
1564           Pair<HRegionInfo, ServerName> p = MetaTableAccessor.getRegion(
1565             regionServer.getConnection(), region.getRegionName());
1566           if (regionServer.serverName.equals(p.getSecond())) {
1567             Boolean closing = regionServer.regionsInTransitionInRS.get(region.getEncodedNameAsBytes());
1568             // Map regionsInTransitionInRSOnly has an entry for a region only if the region
1569             // is in transition on this RS, so here closing can be null. If not null, it can
1570             // be true or false. True means the region is opening on this RS; while false
1571             // means the region is closing. Only return ALREADY_OPENED if not closing (i.e.
1572             // not in transition any more, or still transition to open.
1573             if (!Boolean.FALSE.equals(closing)
1574                 && regionServer.getFromOnlineRegions(region.getEncodedName()) != null) {
1575               LOG.warn("Attempted open of " + region.getEncodedName()
1576                 + " but already online on this server");
1577               builder.addOpeningState(RegionOpeningState.ALREADY_OPENED);
1578               continue;
1579             }
1580           } else {
1581             LOG.warn("The region " + region.getEncodedName() + " is online on this server"
1582               + " but hbase:meta does not have this server - continue opening.");
1583             regionServer.removeFromOnlineRegions(onlineRegion, null);
1584           }
1585         }
1586         LOG.info("Open " + region.getRegionNameAsString());
1587         htd = htds.get(region.getTable());
1588         if (htd == null) {
1589           htd = regionServer.tableDescriptors.get(region.getTable());
1590           if (htd == null) {
1591             throw new IOException("missing table descriptor for " + region.getEncodedName());
1592           }
1593           htds.put(region.getTable(), htd);
1594         }
1595 
1596         final Boolean previous = regionServer.regionsInTransitionInRS.putIfAbsent(
1597           region.getEncodedNameAsBytes(), Boolean.TRUE);
1598 
1599         if (Boolean.FALSE.equals(previous)) {
1600           // There is a close in progress. We need to mark this open as failed in ZK.
1601 
1602           coordination.tryTransitionFromOfflineToFailedOpen(regionServer, region, ord);
1603 
1604           throw new RegionAlreadyInTransitionException("Received OPEN for the region:"
1605             + region.getRegionNameAsString() + " , which we are already trying to CLOSE ");
1606         }
1607 
1608         if (Boolean.TRUE.equals(previous)) {
1609           // An open is in progress. This is supported, but let's log this.
1610           LOG.info("Receiving OPEN for the region:" +
1611             region.getRegionNameAsString() + " , which we are already trying to OPEN"
1612               + " - ignoring this new request for this region.");
1613         }
1614 
1615         // We are opening this region. If it moves back and forth for whatever reason, we don't
1616         // want to keep returning the stale moved record while we are opening/if we close again.
1617         regionServer.removeFromMovedRegions(region.getEncodedName());
1618 
1619         if (previous == null) {
1620           // check if the region to be opened is marked in recovering state in ZK
1621           if (ZKSplitLog.isRegionMarkedRecoveringInZK(regionServer.getZooKeeper(),
1622               region.getEncodedName())) {
1623             // Check if current region open is for distributedLogReplay. This check is to support
1624             // rolling restart/upgrade where we want to Master/RS see same configuration
1625             if (!regionOpenInfo.hasOpenForDistributedLogReplay()
1626                   || regionOpenInfo.getOpenForDistributedLogReplay()) {
1627               regionServer.recoveringRegions.put(region.getEncodedName(), null);
1628             } else {
1629               // Remove stale recovery region from ZK when we open region not for recovering which
1630               // could happen when turn distributedLogReplay off from on.
1631               List<String> tmpRegions = new ArrayList<String>();
1632               tmpRegions.add(region.getEncodedName());
1633               ZKSplitLog.deleteRecoveringRegionZNodes(regionServer.getZooKeeper(),
1634                 tmpRegions);
1635             }
1636           }
1637           // If there is no action in progress, we can submit a specific handler.
1638           // Need to pass the expected version in the constructor.
1639           if (region.isMetaRegion()) {
1640             regionServer.service.submit(new OpenMetaHandler(
1641               regionServer, regionServer, region, htd, masterSystemTime, coordination, ord));
1642           } else {
1643             regionServer.updateRegionFavoredNodesMapping(region.getEncodedName(),
1644               regionOpenInfo.getFavoredNodesList());
1645             regionServer.service.submit(new OpenRegionHandler(
1646               regionServer, regionServer, region, htd, masterSystemTime, coordination, ord));
1647           }
1648         }
1649 
1650         builder.addOpeningState(RegionOpeningState.OPENED);
1651 
1652       } catch (KeeperException zooKeeperEx) {
1653         LOG.error("Can't retrieve recovering state from zookeeper", zooKeeperEx);
1654         throw new ServiceException(zooKeeperEx);
1655       } catch (IOException ie) {
1656         LOG.warn("Failed opening region " + region.getRegionNameAsString(), ie);
1657         if (isBulkAssign) {
1658           builder.addOpeningState(RegionOpeningState.FAILED_OPENING);
1659         } else {
1660           throw new ServiceException(ie);
1661         }
1662       }
1663     }
1664     return builder.build();
1665   }
1666 
1667   /**
1668    *  Wamrmup a region on this server.
1669    *
1670    * This method should only be called by Master. It synchrnously opens the region and
1671    * closes the region bringing the most important pages in cache.
1672    * <p>
1673    *
1674    * @param controller the RPC controller
1675    * @param request the request
1676    * @throws ServiceException
1677    */
1678   @Override
1679   public WarmupRegionResponse warmupRegion(final RpcController controller,
1680       final WarmupRegionRequest request) throws ServiceException {
1681 
1682     RegionInfo regionInfo = request.getRegionInfo();
1683     final HRegionInfo region = HRegionInfo.convert(regionInfo);
1684     HTableDescriptor htd;
1685     WarmupRegionResponse response = WarmupRegionResponse.getDefaultInstance();
1686 
1687     try {
1688       checkOpen();
1689       String encodedName = region.getEncodedName();
1690       byte[] encodedNameBytes = region.getEncodedNameAsBytes();
1691       final Region onlineRegion = regionServer.getFromOnlineRegions(encodedName);
1692 
1693       if (onlineRegion != null) {
1694         LOG.info("Region already online. Skipping warming up " + region);
1695         return response;
1696       }
1697 
1698       if (LOG.isDebugEnabled()) {
1699         LOG.debug("Warming up Region " + region.getRegionNameAsString());
1700       }
1701 
1702       htd = regionServer.tableDescriptors.get(region.getTable());
1703 
1704       if (regionServer.getRegionsInTransitionInRS().containsKey(encodedNameBytes)) {
1705         LOG.info("Region is in transition. Skipping warmup " + region);
1706         return response;
1707       }
1708 
1709       HRegion.warmupHRegion(region, htd, regionServer.getWAL(region),
1710           regionServer.getConfiguration(), regionServer, null);
1711 
1712     } catch (IOException ie) {
1713       LOG.error("Failed warming up region " + region.getRegionNameAsString(), ie);
1714       throw new ServiceException(ie);
1715     }
1716 
1717     return response;
1718   }
1719 
1720   /**
1721    * Replay the given changes when distributedLogReplay WAL edits from a failed RS. The guarantee is
1722    * that the given mutations will be durable on the receiving RS if this method returns without any
1723    * exception.
1724    * @param controller the RPC controller
1725    * @param request the request
1726    * @throws ServiceException
1727    */
1728   @Override
1729   @QosPriority(priority = HConstants.REPLAY_QOS)
1730   public ReplicateWALEntryResponse replay(final RpcController controller,
1731       final ReplicateWALEntryRequest request) throws ServiceException {
1732     long before = EnvironmentEdgeManager.currentTime();
1733     CellScanner cells = ((PayloadCarryingRpcController) controller).cellScanner();
1734     try {
1735       checkOpen();
1736       List<WALEntry> entries = request.getEntryList();
1737       if (entries == null || entries.isEmpty()) {
1738         // empty input
1739         return ReplicateWALEntryResponse.newBuilder().build();
1740       }
1741       ByteString regionName = entries.get(0).getKey().getEncodedRegionName();
1742       Region region = regionServer.getRegionByEncodedName(regionName.toStringUtf8());
1743       RegionCoprocessorHost coprocessorHost =
1744           ServerRegionReplicaUtil.isDefaultReplica(region.getRegionInfo())
1745             ? region.getCoprocessorHost()
1746             : null; // do not invoke coprocessors if this is a secondary region replica
1747       List<Pair<WALKey, WALEdit>> walEntries = new ArrayList<Pair<WALKey, WALEdit>>();
1748 
1749       // Skip adding the edits to WAL if this is a secondary region replica
1750       boolean isPrimary = RegionReplicaUtil.isDefaultReplica(region.getRegionInfo());
1751       Durability durability = isPrimary ? Durability.USE_DEFAULT : Durability.SKIP_WAL;
1752 
1753       for (WALEntry entry : entries) {
1754         if (!regionName.equals(entry.getKey().getEncodedRegionName())) {
1755           throw new NotServingRegionException("Replay request contains entries from multiple " +
1756               "regions. First region:" + regionName.toStringUtf8() + " , other region:"
1757               + entry.getKey().getEncodedRegionName());
1758         }
1759         if (regionServer.nonceManager != null && isPrimary) {
1760           long nonceGroup = entry.getKey().hasNonceGroup()
1761             ? entry.getKey().getNonceGroup() : HConstants.NO_NONCE;
1762           long nonce = entry.getKey().hasNonce() ? entry.getKey().getNonce() : HConstants.NO_NONCE;
1763           regionServer.nonceManager.reportOperationFromWal(
1764               nonceGroup,
1765               nonce,
1766               entry.getKey().getWriteTime());
1767         }
1768         Pair<WALKey, WALEdit> walEntry = (coprocessorHost == null) ? null :
1769           new Pair<WALKey, WALEdit>();
1770         List<WALSplitter.MutationReplay> edits = WALSplitter.getMutationsFromWALEntry(entry,
1771           cells, walEntry, durability);
1772         if (coprocessorHost != null) {
1773           // Start coprocessor replay here. The coprocessor is for each WALEdit instead of a
1774           // KeyValue.
1775           if (coprocessorHost.preWALRestore(region.getRegionInfo(), walEntry.getFirst(),
1776             walEntry.getSecond())) {
1777             // if bypass this log entry, ignore it ...
1778             continue;
1779           }
1780           walEntries.add(walEntry);
1781         }
1782         if(edits!=null && !edits.isEmpty()) {
1783           long replaySeqId = (entry.getKey().hasOrigSequenceNumber()) ?
1784             entry.getKey().getOrigSequenceNumber() : entry.getKey().getLogSequenceNumber();
1785           OperationStatus[] result = doReplayBatchOp(region, edits, replaySeqId);
1786           // check if it's a partial success
1787           for (int i = 0; result != null && i < result.length; i++) {
1788             if (result[i] != OperationStatus.SUCCESS) {
1789               throw new IOException(result[i].getExceptionMsg());
1790             }
1791           }
1792         }
1793       }
1794 
1795       //sync wal at the end because ASYNC_WAL is used above
1796       WAL wal = getWAL(region);
1797       if (wal != null) {
1798         wal.sync();
1799       }
1800 
1801       if (coprocessorHost != null) {
1802         for (Pair<WALKey, WALEdit> entry : walEntries) {
1803           coprocessorHost.postWALRestore(region.getRegionInfo(), entry.getFirst(),
1804             entry.getSecond());
1805         }
1806       }
1807       return ReplicateWALEntryResponse.newBuilder().build();
1808     } catch (IOException ie) {
1809       throw new ServiceException(ie);
1810     } finally {
1811       if (regionServer.metricsRegionServer != null) {
1812         regionServer.metricsRegionServer.updateReplay(
1813           EnvironmentEdgeManager.currentTime() - before);
1814       }
1815     }
1816   }
1817 
1818   WAL getWAL(Region region) {
1819     return ((HRegion)region).getWAL();
1820   }
1821 
1822   /**
1823    * Replicate WAL entries on the region server.
1824    *
1825    * @param controller the RPC controller
1826    * @param request the request
1827    * @throws ServiceException
1828    */
1829   @Override
1830   @QosPriority(priority=HConstants.REPLICATION_QOS)
1831   public ReplicateWALEntryResponse replicateWALEntry(final RpcController controller,
1832       final ReplicateWALEntryRequest request) throws ServiceException {
1833     try {
1834       checkOpen();
1835       if (regionServer.replicationSinkHandler != null) {
1836         requestCount.increment();
1837         List<WALEntry> entries = request.getEntryList();
1838         CellScanner cellScanner = ((PayloadCarryingRpcController)controller).cellScanner();
1839         regionServer.getRegionServerCoprocessorHost().preReplicateLogEntries(entries, cellScanner);
1840         regionServer.replicationSinkHandler.replicateLogEntries(entries, cellScanner);
1841         regionServer.getRegionServerCoprocessorHost().postReplicateLogEntries(entries, cellScanner);
1842         return ReplicateWALEntryResponse.newBuilder().build();
1843       } else {
1844         throw new ServiceException("Replication services are not initialized yet");
1845       }
1846     } catch (IOException ie) {
1847       throw new ServiceException(ie);
1848     }
1849   }
1850 
1851   /**
1852    * Roll the WAL writer of the region server.
1853    * @param controller the RPC controller
1854    * @param request the request
1855    * @throws ServiceException
1856    */
1857   @Override
1858   public RollWALWriterResponse rollWALWriter(final RpcController controller,
1859       final RollWALWriterRequest request) throws ServiceException {
1860     try {
1861       checkOpen();
1862       requestCount.increment();
1863       regionServer.getRegionServerCoprocessorHost().preRollWALWriterRequest();
1864       regionServer.walRoller.requestRollAll();
1865       regionServer.getRegionServerCoprocessorHost().postRollWALWriterRequest();
1866       RollWALWriterResponse.Builder builder = RollWALWriterResponse.newBuilder();
1867       return builder.build();
1868     } catch (IOException ie) {
1869       throw new ServiceException(ie);
1870     }
1871   }
1872 
1873   /**
1874    * Split a region on the region server.
1875    *
1876    * @param controller the RPC controller
1877    * @param request the request
1878    * @throws ServiceException
1879    */
1880   @Override
1881   @QosPriority(priority=HConstants.ADMIN_QOS)
1882   public SplitRegionResponse splitRegion(final RpcController controller,
1883       final SplitRegionRequest request) throws ServiceException {
1884     try {
1885       checkOpen();
1886       requestCount.increment();
1887       Region region = getRegion(request.getRegion());
1888       region.startRegionOperation(Operation.SPLIT_REGION);
1889       if (region.getRegionInfo().getReplicaId() != HRegionInfo.DEFAULT_REPLICA_ID) {
1890         throw new IOException("Can't split replicas directly. "
1891             + "Replicas are auto-split when their primary is split.");
1892       }
1893       LOG.info("Splitting " + region.getRegionInfo().getRegionNameAsString());
1894       long startTime = EnvironmentEdgeManager.currentTime();
1895       FlushResult flushResult = region.flush(true);
1896       if (flushResult.isFlushSucceeded()) {
1897         long endTime = EnvironmentEdgeManager.currentTime();
1898         regionServer.metricsRegionServer.updateFlushTime(endTime - startTime);
1899       }
1900       byte[] splitPoint = null;
1901       if (request.hasSplitPoint()) {
1902         splitPoint = request.getSplitPoint().toByteArray();
1903       }
1904       ((HRegion)region).forceSplit(splitPoint);
1905       regionServer.compactSplitThread.requestSplit(region, ((HRegion)region).checkSplit(),
1906         RpcServer.getRequestUser());
1907       return SplitRegionResponse.newBuilder().build();
1908     } catch (DroppedSnapshotException ex) {
1909       regionServer.abort("Replay of WAL required. Forcing server shutdown", ex);
1910       throw new ServiceException(ex);
1911     } catch (IOException ie) {
1912       throw new ServiceException(ie);
1913     }
1914   }
1915 
1916   /**
1917    * Stop the region server.
1918    *
1919    * @param controller the RPC controller
1920    * @param request the request
1921    * @throws ServiceException
1922    */
1923   @Override
1924   @QosPriority(priority=HConstants.ADMIN_QOS)
1925   public StopServerResponse stopServer(final RpcController controller,
1926       final StopServerRequest request) throws ServiceException {
1927     requestCount.increment();
1928     String reason = request.getReason();
1929     regionServer.stop(reason);
1930     return StopServerResponse.newBuilder().build();
1931   }
1932 
1933   @Override
1934   public UpdateFavoredNodesResponse updateFavoredNodes(RpcController controller,
1935       UpdateFavoredNodesRequest request) throws ServiceException {
1936     List<UpdateFavoredNodesRequest.RegionUpdateInfo> openInfoList = request.getUpdateInfoList();
1937     UpdateFavoredNodesResponse.Builder respBuilder = UpdateFavoredNodesResponse.newBuilder();
1938     for (UpdateFavoredNodesRequest.RegionUpdateInfo regionUpdateInfo : openInfoList) {
1939       HRegionInfo hri = HRegionInfo.convert(regionUpdateInfo.getRegion());
1940       regionServer.updateRegionFavoredNodesMapping(hri.getEncodedName(),
1941         regionUpdateInfo.getFavoredNodesList());
1942     }
1943     respBuilder.setResponse(openInfoList.size());
1944     return respBuilder.build();
1945   }
1946 
1947   /**
1948    * Atomically bulk load several HFiles into an open region
1949    * @return true if successful, false is failed but recoverably (no action)
1950    * @throws ServiceException if failed unrecoverably
1951    */
1952   @Override
1953   public BulkLoadHFileResponse bulkLoadHFile(final RpcController controller,
1954       final BulkLoadHFileRequest request) throws ServiceException {
1955     try {
1956       checkOpen();
1957       requestCount.increment();
1958       Region region = getRegion(request.getRegion());
1959       List<Pair<byte[], String>> familyPaths = new ArrayList<Pair<byte[], String>>();
1960       for (FamilyPath familyPath: request.getFamilyPathList()) {
1961         familyPaths.add(new Pair<byte[], String>(familyPath.getFamily().toByteArray(),
1962           familyPath.getPath()));
1963       }
1964       boolean bypass = false;
1965       if (region.getCoprocessorHost() != null) {
1966         bypass = region.getCoprocessorHost().preBulkLoadHFile(familyPaths);
1967       }
1968       boolean loaded = false;
1969       if (!bypass) {
1970         loaded = region.bulkLoadHFiles(familyPaths, request.getAssignSeqNum(), null);
1971       }
1972       if (region.getCoprocessorHost() != null) {
1973         loaded = region.getCoprocessorHost().postBulkLoadHFile(familyPaths, loaded);
1974       }
1975       BulkLoadHFileResponse.Builder builder = BulkLoadHFileResponse.newBuilder();
1976       builder.setLoaded(loaded);
1977       return builder.build();
1978     } catch (IOException ie) {
1979       throw new ServiceException(ie);
1980     }
1981   }
1982 
1983   @Override
1984   public CoprocessorServiceResponse execService(final RpcController controller,
1985       final CoprocessorServiceRequest request) throws ServiceException {
1986     try {
1987       checkOpen();
1988       requestCount.increment();
1989       Region region = getRegion(request.getRegion());
1990       Message result = execServiceOnRegion(region, request.getCall());
1991       CoprocessorServiceResponse.Builder builder =
1992         CoprocessorServiceResponse.newBuilder();
1993       builder.setRegion(RequestConverter.buildRegionSpecifier(
1994         RegionSpecifierType.REGION_NAME, region.getRegionInfo().getRegionName()));
1995       builder.setValue(
1996         builder.getValueBuilder().setName(result.getClass().getName())
1997           .setValue(result.toByteString()));
1998       return builder.build();
1999     } catch (IOException ie) {
2000       throw new ServiceException(ie);
2001     }
2002   }
2003 
2004   private Message execServiceOnRegion(Region region,
2005       final ClientProtos.CoprocessorServiceCall serviceCall) throws IOException {
2006     // ignore the passed in controller (from the serialized call)
2007     ServerRpcController execController = new ServerRpcController();
2008     return region.execService(execController, serviceCall);
2009   }
2010 
2011   /**
2012    * Get data from a table.
2013    *
2014    * @param controller the RPC controller
2015    * @param request the get request
2016    * @throws ServiceException
2017    */
2018   @Override
2019   public GetResponse get(final RpcController controller,
2020       final GetRequest request) throws ServiceException {
2021     long before = EnvironmentEdgeManager.currentTime();
2022     OperationQuota quota = null;
2023     try {
2024       checkOpen();
2025       requestCount.increment();
2026       Region region = getRegion(request.getRegion());
2027 
2028       GetResponse.Builder builder = GetResponse.newBuilder();
2029       ClientProtos.Get get = request.getGet();
2030       Boolean existence = null;
2031       Result r = null;
2032       quota = getQuotaManager().checkQuota(region, OperationQuota.OperationType.GET);
2033 
2034       if (get.hasClosestRowBefore() && get.getClosestRowBefore()) {
2035         if (get.getColumnCount() != 1) {
2036           throw new DoNotRetryIOException(
2037             "get ClosestRowBefore supports one and only one family now, not "
2038               + get.getColumnCount() + " families");
2039         }
2040         byte[] row = get.getRow().toByteArray();
2041         byte[] family = get.getColumn(0).getFamily().toByteArray();
2042         r = region.getClosestRowBefore(row, family);
2043       } else {
2044         Get clientGet = ProtobufUtil.toGet(get);
2045         if (get.getExistenceOnly() && region.getCoprocessorHost() != null) {
2046           existence = region.getCoprocessorHost().preExists(clientGet);
2047         }
2048         if (existence == null) {
2049           r = region.get(clientGet);
2050           if (get.getExistenceOnly()) {
2051             boolean exists = r.getExists();
2052             if (region.getCoprocessorHost() != null) {
2053               exists = region.getCoprocessorHost().postExists(clientGet, exists);
2054             }
2055             existence = exists;
2056           }
2057         }
2058       }
2059       if (existence != null){
2060         ClientProtos.Result pbr =
2061             ProtobufUtil.toResult(existence, region.getRegionInfo().getReplicaId() != 0);
2062         builder.setResult(pbr);
2063       } else  if (r != null) {
2064         ClientProtos.Result pbr = ProtobufUtil.toResult(r);
2065         builder.setResult(pbr);
2066       }
2067       if (r != null) {
2068         quota.addGetResult(r);
2069       }
2070       return builder.build();
2071     } catch (IOException ie) {
2072       throw new ServiceException(ie);
2073     } finally {
2074       if (regionServer.metricsRegionServer != null) {
2075         regionServer.metricsRegionServer.updateGet(
2076           EnvironmentEdgeManager.currentTime() - before);
2077       }
2078       if (quota != null) {
2079         quota.close();
2080       }
2081     }
2082   }
2083 
2084   /**
2085    * Execute multiple actions on a table: get, mutate, and/or execCoprocessor
2086    *
2087    * @param rpcc the RPC controller
2088    * @param request the multi request
2089    * @throws ServiceException
2090    */
2091   @Override
2092   public MultiResponse multi(final RpcController rpcc, final MultiRequest request)
2093   throws ServiceException {
2094     try {
2095       checkOpen();
2096     } catch (IOException ie) {
2097       throw new ServiceException(ie);
2098     }
2099 
2100     // rpc controller is how we bring in data via the back door;  it is unprotobuf'ed data.
2101     // It is also the conduit via which we pass back data.
2102     PayloadCarryingRpcController controller = (PayloadCarryingRpcController)rpcc;
2103     CellScanner cellScanner = controller != null ? controller.cellScanner(): null;
2104     if (controller != null) {
2105       controller.setCellScanner(null);
2106     }
2107 
2108     long nonceGroup = request.hasNonceGroup() ? request.getNonceGroup() : HConstants.NO_NONCE;
2109 
2110     // this will contain all the cells that we need to return. It's created later, if needed.
2111     List<CellScannable> cellsToReturn = null;
2112     MultiResponse.Builder responseBuilder = MultiResponse.newBuilder();
2113     RegionActionResult.Builder regionActionResultBuilder = RegionActionResult.newBuilder();
2114     Boolean processed = null;
2115 
2116     for (RegionAction regionAction : request.getRegionActionList()) {
2117       this.requestCount.add(regionAction.getActionCount());
2118       OperationQuota quota;
2119       Region region;
2120       regionActionResultBuilder.clear();
2121       try {
2122         region = getRegion(regionAction.getRegion());
2123         quota = getQuotaManager().checkQuota(region, regionAction.getActionList());
2124       } catch (IOException e) {
2125         rpcServer.getMetrics().exception(e);
2126         regionActionResultBuilder.setException(ResponseConverter.buildException(e));
2127         responseBuilder.addRegionActionResult(regionActionResultBuilder.build());
2128         // All Mutations in this RegionAction not executed as we can not see the Region online here
2129         // in this RS. Will be retried from Client. Skipping all the Cells in CellScanner
2130         // corresponding to these Mutations.
2131         if (cellScanner != null) {
2132           skipCellsForMutations(regionAction.getActionList(), cellScanner);
2133         }
2134         continue;  // For this region it's a failure.
2135       }
2136 
2137       if (regionAction.hasAtomic() && regionAction.getAtomic()) {
2138         // How does this call happen?  It may need some work to play well w/ the surroundings.
2139         // Need to return an item per Action along w/ Action index.  TODO.
2140         try {
2141           if (request.hasCondition()) {
2142             Condition condition = request.getCondition();
2143             byte[] row = condition.getRow().toByteArray();
2144             byte[] family = condition.getFamily().toByteArray();
2145             byte[] qualifier = condition.getQualifier().toByteArray();
2146             CompareOp compareOp = CompareOp.valueOf(condition.getCompareType().name());
2147             ByteArrayComparable comparator =
2148                 ProtobufUtil.toComparator(condition.getComparator());
2149             processed = checkAndRowMutate(region, regionAction.getActionList(),
2150                   cellScanner, row, family, qualifier, compareOp, comparator);
2151           } else {
2152             ClientProtos.RegionLoadStats stats = mutateRows(region, regionAction.getActionList(),
2153                 cellScanner);
2154             // add the stats to the request
2155             if(stats != null) {
2156               responseBuilder.addRegionActionResult(RegionActionResult.newBuilder()
2157                   .addResultOrException(ResultOrException.newBuilder().setLoadStats(stats)));
2158             }
2159             processed = Boolean.TRUE;
2160           }
2161         } catch (IOException e) {
2162           rpcServer.getMetrics().exception(e);
2163           // As it's atomic, we may expect it's a global failure.
2164           regionActionResultBuilder.setException(ResponseConverter.buildException(e));
2165         }
2166       } else {
2167         // doNonAtomicRegionMutation manages the exception internally
2168         cellsToReturn = doNonAtomicRegionMutation(region, quota, regionAction, cellScanner,
2169             regionActionResultBuilder, cellsToReturn, nonceGroup);
2170       }
2171       responseBuilder.addRegionActionResult(regionActionResultBuilder.build());
2172       quota.close();
2173     }
2174     // Load the controller with the Cells to return.
2175     if (cellsToReturn != null && !cellsToReturn.isEmpty() && controller != null) {
2176       controller.setCellScanner(CellUtil.createCellScanner(cellsToReturn));
2177     }
2178     if (processed != null) {
2179       responseBuilder.setProcessed(processed);
2180     }
2181     return responseBuilder.build();
2182   }
2183 
2184   private void skipCellsForMutations(List<Action> actions, CellScanner cellScanner) {
2185     for (Action action : actions) {
2186       skipCellsForMutation(action, cellScanner);
2187     }
2188   }
2189 
2190   private void skipCellsForMutation(Action action, CellScanner cellScanner) {
2191     try {
2192       if (action.hasMutation()) {
2193         MutationProto m = action.getMutation();
2194         if (m.hasAssociatedCellCount()) {
2195           for (int i = 0; i < m.getAssociatedCellCount(); i++) {
2196             cellScanner.advance();
2197           }
2198         }
2199       }
2200     } catch (IOException e) {
2201       // No need to handle these Individual Muatation level issue. Any way this entire RegionAction
2202       // marked as failed as we could not see the Region here. At client side the top level
2203       // RegionAction exception will be considered first.
2204       LOG.error("Error while skipping Cells in CellScanner for invalid Region Mutations", e);
2205     }
2206   }
2207 
2208   /**
2209    * Mutate data in a table.
2210    *
2211    * @param rpcc the RPC controller
2212    * @param request the mutate request
2213    * @throws ServiceException
2214    */
2215   @Override
2216   public MutateResponse mutate(final RpcController rpcc,
2217       final MutateRequest request) throws ServiceException {
2218     // rpc controller is how we bring in data via the back door;  it is unprotobuf'ed data.
2219     // It is also the conduit via which we pass back data.
2220     PayloadCarryingRpcController controller = (PayloadCarryingRpcController)rpcc;
2221     CellScanner cellScanner = controller != null ? controller.cellScanner() : null;
2222     OperationQuota quota = null;
2223     // Clear scanner so we are not holding on to reference across call.
2224     if (controller != null) {
2225       controller.setCellScanner(null);
2226     }
2227     try {
2228       checkOpen();
2229       requestCount.increment();
2230       Region region = getRegion(request.getRegion());
2231       MutateResponse.Builder builder = MutateResponse.newBuilder();
2232       MutationProto mutation = request.getMutation();
2233       if (!region.getRegionInfo().isMetaTable()) {
2234         regionServer.cacheFlusher.reclaimMemStoreMemory();
2235       }
2236       long nonceGroup = request.hasNonceGroup() ? request.getNonceGroup() : HConstants.NO_NONCE;
2237       Result r = null;
2238       Boolean processed = null;
2239       MutationType type = mutation.getMutateType();
2240       long mutationSize = 0;
2241       quota = getQuotaManager().checkQuota(region, OperationQuota.OperationType.MUTATE);
2242       switch (type) {
2243       case APPEND:
2244         // TODO: this doesn't actually check anything.
2245         r = append(region, quota, mutation, cellScanner, nonceGroup);
2246         break;
2247       case INCREMENT:
2248         // TODO: this doesn't actually check anything.
2249         r = increment(region, quota, mutation, cellScanner, nonceGroup);
2250         break;
2251       case PUT:
2252         Put put = ProtobufUtil.toPut(mutation, cellScanner);
2253         quota.addMutation(put);
2254         if (request.hasCondition()) {
2255           Condition condition = request.getCondition();
2256           byte[] row = condition.getRow().toByteArray();
2257           byte[] family = condition.getFamily().toByteArray();
2258           byte[] qualifier = condition.getQualifier().toByteArray();
2259           CompareOp compareOp = CompareOp.valueOf(condition.getCompareType().name());
2260           ByteArrayComparable comparator =
2261             ProtobufUtil.toComparator(condition.getComparator());
2262           if (region.getCoprocessorHost() != null) {
2263             processed = region.getCoprocessorHost().preCheckAndPut(
2264               row, family, qualifier, compareOp, comparator, put);
2265           }
2266           if (processed == null) {
2267             boolean result = region.checkAndMutate(row, family,
2268               qualifier, compareOp, comparator, put, true);
2269             if (region.getCoprocessorHost() != null) {
2270               result = region.getCoprocessorHost().postCheckAndPut(row, family,
2271                 qualifier, compareOp, comparator, put, result);
2272             }
2273             processed = result;
2274           }
2275         } else {
2276           region.put(put);
2277           processed = Boolean.TRUE;
2278         }
2279         break;
2280       case DELETE:
2281         Delete delete = ProtobufUtil.toDelete(mutation, cellScanner);
2282         quota.addMutation(delete);
2283         if (request.hasCondition()) {
2284           Condition condition = request.getCondition();
2285           byte[] row = condition.getRow().toByteArray();
2286           byte[] family = condition.getFamily().toByteArray();
2287           byte[] qualifier = condition.getQualifier().toByteArray();
2288           CompareOp compareOp = CompareOp.valueOf(condition.getCompareType().name());
2289           ByteArrayComparable comparator =
2290             ProtobufUtil.toComparator(condition.getComparator());
2291           if (region.getCoprocessorHost() != null) {
2292             processed = region.getCoprocessorHost().preCheckAndDelete(
2293               row, family, qualifier, compareOp, comparator, delete);
2294           }
2295           if (processed == null) {
2296             boolean result = region.checkAndMutate(row, family,
2297               qualifier, compareOp, comparator, delete, true);
2298             if (region.getCoprocessorHost() != null) {
2299               result = region.getCoprocessorHost().postCheckAndDelete(row, family,
2300                 qualifier, compareOp, comparator, delete, result);
2301             }
2302             processed = result;
2303           }
2304         } else {
2305           region.delete(delete);
2306           processed = Boolean.TRUE;
2307         }
2308         break;
2309       default:
2310           throw new DoNotRetryIOException(
2311             "Unsupported mutate type: " + type.name());
2312       }
2313       if (processed != null) builder.setProcessed(processed.booleanValue());
2314       addResult(builder, r, controller);
2315       return builder.build();
2316     } catch (IOException ie) {
2317       regionServer.checkFileSystem();
2318       throw new ServiceException(ie);
2319     } finally {
2320       if (quota != null) {
2321         quota.close();
2322       }
2323     }
2324   }
2325 
2326   /**
2327    * Scan data in a table.
2328    *
2329    * @param controller the RPC controller
2330    * @param request the scan request
2331    * @throws ServiceException
2332    */
2333   @Override
2334   public ScanResponse scan(final RpcController controller, final ScanRequest request)
2335   throws ServiceException {
2336     OperationQuota quota = null;
2337     Leases.Lease lease = null;
2338     String scannerName = null;
2339     try {
2340       if (!request.hasScannerId() && !request.hasScan()) {
2341         throw new DoNotRetryIOException(
2342           "Missing required input: scannerId or scan");
2343       }
2344       long scannerId = -1;
2345       if (request.hasScannerId()) {
2346         scannerId = request.getScannerId();
2347         scannerName = String.valueOf(scannerId);
2348       }
2349       try {
2350         checkOpen();
2351       } catch (IOException e) {
2352         // If checkOpen failed, server not running or filesystem gone,
2353         // cancel this lease; filesystem is gone or we're closing or something.
2354         if (scannerName != null) {
2355           LOG.debug("Server shutting down and client tried to access missing scanner "
2356             + scannerName);
2357           if (regionServer.leases != null) {
2358             try {
2359               regionServer.leases.cancelLease(scannerName);
2360             } catch (LeaseException le) {
2361               // No problem, ignore
2362               if (LOG.isTraceEnabled()) {
2363                 LOG.trace("Un-able to cancel lease of scanner. It could already be closed.");
2364               }
2365              }
2366           }
2367         }
2368         throw e;
2369       }
2370       requestCount.increment();
2371 
2372       int ttl = 0;
2373       Region region = null;
2374       RegionScanner scanner = null;
2375       RegionScannerHolder rsh = null;
2376       boolean moreResults = true;
2377       boolean closeScanner = false;
2378       boolean isSmallScan = false;
2379       RpcCallContext context = RpcServer.getCurrentCall();
2380       Object lastBlock = null;
2381 
2382       ScanResponse.Builder builder = ScanResponse.newBuilder();
2383       if (request.hasCloseScanner()) {
2384         closeScanner = request.getCloseScanner();
2385       }
2386       int rows = closeScanner ? 0 : 1;
2387       if (request.hasNumberOfRows()) {
2388         rows = request.getNumberOfRows();
2389       }
2390       if (request.hasScannerId()) {
2391         rsh = scanners.get(scannerName);
2392         if (rsh == null) {
2393           LOG.warn("Client tried to access missing scanner " + scannerName);
2394           throw new UnknownScannerException(
2395             "Unknown scanner '" + scannerName + "'. This can happen due to any of the following "
2396                 + "reasons: a) Scanner id given is wrong, b) Scanner lease expired because of "
2397                 + "long wait between consecutive client checkins, c) Server may be closing down, "
2398                 + "d) RegionServer restart during upgrade.\nIf the issue is due to reason (b), a "
2399                 + "possible fix would be increasing the value of"
2400                 + "'hbase.client.scanner.timeout.period' configuration.");
2401         }
2402         scanner = rsh.s;
2403         HRegionInfo hri = scanner.getRegionInfo();
2404         region = regionServer.getRegion(hri.getRegionName());
2405         if (region != rsh.r) { // Yes, should be the same instance
2406           throw new NotServingRegionException("Region was re-opened after the scanner"
2407             + scannerName + " was created: " + hri.getRegionNameAsString());
2408         }
2409       } else {
2410         region = getRegion(request.getRegion());
2411         ClientProtos.Scan protoScan = request.getScan();
2412         boolean isLoadingCfsOnDemandSet = protoScan.hasLoadColumnFamiliesOnDemand();
2413         Scan scan = ProtobufUtil.toScan(protoScan);
2414         // if the request doesn't set this, get the default region setting.
2415         if (!isLoadingCfsOnDemandSet) {
2416           scan.setLoadColumnFamiliesOnDemand(region.isLoadingCfsOnDemandDefault());
2417         }
2418 
2419         isSmallScan = scan.isSmall();
2420         if (!scan.hasFamilies()) {
2421           // Adding all families to scanner
2422           for (byte[] family: region.getTableDesc().getFamiliesKeys()) {
2423             scan.addFamily(family);
2424           }
2425         }
2426 
2427         if (region.getCoprocessorHost() != null) {
2428           scanner = region.getCoprocessorHost().preScannerOpen(scan);
2429         }
2430         if (scanner == null) {
2431           scanner = region.getScanner(scan);
2432         }
2433         if (region.getCoprocessorHost() != null) {
2434           scanner = region.getCoprocessorHost().postScannerOpen(scan, scanner);
2435         }
2436         scannerId = addScanner(scanner, region);
2437         scannerName = String.valueOf(scannerId);
2438         ttl = this.scannerLeaseTimeoutPeriod;
2439       }
2440       if (request.hasRenew() && request.getRenew()) {
2441         rsh = scanners.get(scannerName);
2442         lease = regionServer.leases.removeLease(scannerName);
2443         if (lease != null && rsh != null) {
2444           regionServer.leases.addLease(lease);
2445           // Increment the nextCallSeq value which is the next expected from client.
2446           rsh.incNextCallSeq();
2447         }
2448         return builder.build();
2449       }
2450 
2451       quota = getQuotaManager().checkQuota(region, OperationQuota.OperationType.SCAN);
2452       long maxQuotaResultSize = Math.min(maxScannerResultSize, quota.getReadAvailable());
2453       if (rows > 0) {
2454         // if nextCallSeq does not match throw Exception straight away. This needs to be
2455         // performed even before checking of Lease.
2456         // See HBASE-5974
2457         if (request.hasNextCallSeq()) {
2458           if (rsh == null) {
2459             rsh = scanners.get(scannerName);
2460           }
2461           if (rsh != null) {
2462             if (request.getNextCallSeq() != rsh.getNextCallSeq()) {
2463               throw new OutOfOrderScannerNextException(
2464                 "Expected nextCallSeq: " + rsh.getNextCallSeq()
2465                 + " But the nextCallSeq got from client: " + request.getNextCallSeq() +
2466                 "; request=" + TextFormat.shortDebugString(request));
2467             }
2468             // Increment the nextCallSeq value which is the next expected from client.
2469             rsh.incNextCallSeq();
2470           }
2471         }
2472         try {
2473           // Remove lease while its being processed in server; protects against case
2474           // where processing of request takes > lease expiration time.
2475           lease = regionServer.leases.removeLease(scannerName);
2476           List<Result> results = new ArrayList<Result>();
2477 
2478           boolean done = false;
2479           // Call coprocessor. Get region info from scanner.
2480           if (region != null && region.getCoprocessorHost() != null) {
2481             Boolean bypass = region.getCoprocessorHost().preScannerNext(
2482               scanner, results, rows);
2483             if (!results.isEmpty()) {
2484               for (Result r : results) {
2485                 lastBlock = addSize(context, r, lastBlock);
2486               }
2487             }
2488             if (bypass != null && bypass.booleanValue()) {
2489               done = true;
2490             }
2491           }
2492 
2493           if (!done) {
2494             long maxResultSize = Math.min(scanner.getMaxResultSize(), maxQuotaResultSize);
2495             if (maxResultSize <= 0) {
2496               maxResultSize = maxQuotaResultSize;
2497             }
2498             // This is cells inside a row. Default size is 10 so if many versions or many cfs,
2499             // then we'll resize. Resizings show in profiler. Set it higher than 10. For now
2500             // arbitrary 32. TODO: keep record of general size of results being returned.
2501             List<Cell> values = new ArrayList<Cell>(32);
2502             region.startRegionOperation(Operation.SCAN);
2503             try {
2504               int i = 0;
2505               synchronized(scanner) {
2506                 boolean stale = (region.getRegionInfo().getReplicaId() != 0);
2507                 boolean clientHandlesPartials =
2508                     request.hasClientHandlesPartials() && request.getClientHandlesPartials();
2509                 boolean clientHandlesHeartbeats =
2510                     request.hasClientHandlesHeartbeats() && request.getClientHandlesHeartbeats();
2511 
2512                 // On the server side we must ensure that the correct ordering of partial results is
2513                 // returned to the client to allow them to properly reconstruct the partial results.
2514                 // If the coprocessor host is adding to the result list, we cannot guarantee the
2515                 // correct ordering of partial results and so we prevent partial results from being
2516                 // formed.
2517                 boolean serverGuaranteesOrderOfPartials = results.isEmpty();
2518                 boolean allowPartialResults =
2519                     clientHandlesPartials && serverGuaranteesOrderOfPartials && !isSmallScan;
2520                 boolean moreRows = false;
2521 
2522                 // Heartbeat messages occur when the processing of the ScanRequest is exceeds a
2523                 // certain time threshold on the server. When the time threshold is exceeded, the
2524                 // server stops the scan and sends back whatever Results it has accumulated within
2525                 // that time period (may be empty). Since heartbeat messages have the potential to
2526                 // create partial Results (in the event that the timeout occurs in the middle of a
2527                 // row), we must only generate heartbeat messages when the client can handle both
2528                 // heartbeats AND partials
2529                 boolean allowHeartbeatMessages = clientHandlesHeartbeats && allowPartialResults;
2530 
2531                 // Default value of timeLimit is negative to indicate no timeLimit should be
2532                 // enforced.
2533                 long timeLimit = -1;
2534 
2535                 // Set the time limit to be half of the more restrictive timeout value (one of the
2536                 // timeout values must be positive). In the event that both values are positive, the
2537                 // more restrictive of the two is used to calculate the limit.
2538                 if (allowHeartbeatMessages && (scannerLeaseTimeoutPeriod > 0 || rpcTimeout > 0)) {
2539                   long timeLimitDelta;
2540                   if (scannerLeaseTimeoutPeriod > 0 && rpcTimeout > 0) {
2541                     timeLimitDelta = Math.min(scannerLeaseTimeoutPeriod, rpcTimeout);
2542                   } else {
2543                     timeLimitDelta =
2544                         scannerLeaseTimeoutPeriod > 0 ? scannerLeaseTimeoutPeriod : rpcTimeout;
2545                   }
2546                   // Use half of whichever timeout value was more restrictive... But don't allow
2547                   // the time limit to be less than the allowable minimum (could cause an
2548                   // immediatate timeout before scanning any data).
2549                   timeLimitDelta = Math.max(timeLimitDelta / 2, minimumScanTimeLimitDelta);
2550                   timeLimit = System.currentTimeMillis() + timeLimitDelta;
2551                 }
2552 
2553                 final LimitScope sizeScope =
2554                     allowPartialResults ? LimitScope.BETWEEN_CELLS : LimitScope.BETWEEN_ROWS;
2555                 final LimitScope timeScope =
2556                     allowHeartbeatMessages ? LimitScope.BETWEEN_CELLS : LimitScope.BETWEEN_ROWS;
2557 
2558                 boolean trackMetrics =
2559                     request.hasTrackScanMetrics() && request.getTrackScanMetrics();
2560 
2561                 // Configure with limits for this RPC. Set keep progress true since size progress
2562                 // towards size limit should be kept between calls to nextRaw
2563                 ScannerContext.Builder contextBuilder = ScannerContext.newBuilder(true);
2564                 contextBuilder.setSizeLimit(sizeScope, maxResultSize);
2565                 contextBuilder.setBatchLimit(scanner.getBatch());
2566                 contextBuilder.setTimeLimit(timeScope, timeLimit);
2567                 contextBuilder.setTrackMetrics(trackMetrics);
2568                 ScannerContext scannerContext = contextBuilder.build();
2569 
2570                 boolean limitReached = false;
2571                 while (i < rows) {
2572                   // Reset the batch progress to 0 before every call to RegionScanner#nextRaw. The
2573                   // batch limit is a limit on the number of cells per Result. Thus, if progress is
2574                   // being tracked (i.e. scannerContext.keepProgress() is true) then we need to
2575                   // reset the batch progress between nextRaw invocations since we don't want the
2576                   // batch progress from previous calls to affect future calls
2577                   scannerContext.setBatchProgress(0);
2578 
2579                   // Collect values to be returned here
2580                   moreRows = scanner.nextRaw(values, scannerContext);
2581 
2582                   if (!values.isEmpty()) {
2583                     final boolean partial = scannerContext.partialResultFormed();
2584                     Result r = Result.create(values, null, stale, partial);
2585                     lastBlock = addSize(context, r, lastBlock);
2586                     results.add(r);
2587                     i++;
2588                   }
2589 
2590                   boolean sizeLimitReached = scannerContext.checkSizeLimit(LimitScope.BETWEEN_ROWS);
2591                   boolean timeLimitReached = scannerContext.checkTimeLimit(LimitScope.BETWEEN_ROWS);
2592                   boolean rowLimitReached = i >= rows;
2593                   limitReached = sizeLimitReached || timeLimitReached || rowLimitReached;
2594 
2595                   if (limitReached || !moreRows) {
2596                     if (LOG.isTraceEnabled()) {
2597                       LOG.trace("Done scanning. limitReached: " + limitReached + " moreRows: "
2598                           + moreRows + " scannerContext: " + scannerContext);
2599                     }
2600                     // We only want to mark a ScanResponse as a heartbeat message in the event that
2601                     // there are more values to be read server side. If there aren't more values,
2602                     // marking it as a heartbeat is wasteful because the client will need to issue
2603                     // another ScanRequest only to realize that they already have all the values
2604                     if (moreRows) {
2605                       // Heartbeat messages occur when the time limit has been reached.
2606                       builder.setHeartbeatMessage(timeLimitReached);
2607                     }
2608                     break;
2609                   }
2610                   values.clear();
2611                 }
2612 
2613                 if (limitReached || moreRows) {
2614                   // We stopped prematurely
2615                   builder.setMoreResultsInRegion(true);
2616                 } else {
2617                   // We didn't get a single batch
2618                   builder.setMoreResultsInRegion(false);
2619                 }
2620 
2621                 // Check to see if the client requested that we track metrics server side. If the
2622                 // client requested metrics, retrieve the metrics from the scanner context.
2623                 if (trackMetrics) {
2624                   Map<String, Long> metrics = scannerContext.getMetrics().getMetricsMap();
2625                   ScanMetrics.Builder metricBuilder = ScanMetrics.newBuilder();
2626                   NameInt64Pair.Builder pairBuilder = NameInt64Pair.newBuilder();
2627 
2628                   for (Entry<String, Long> entry : metrics.entrySet()) {
2629                     pairBuilder.setName(entry.getKey());
2630                     pairBuilder.setValue(entry.getValue());
2631                     metricBuilder.addMetrics(pairBuilder.build());
2632                   }
2633 
2634                   builder.setScanMetrics(metricBuilder.build());
2635                 }
2636               }
2637               region.updateReadRequestsCount(i);
2638               long responseCellSize = context != null ? context.getResponseCellSize() : 0;
2639               region.getMetrics().updateScanNext(responseCellSize);
2640               if (regionServer.metricsRegionServer != null) {
2641                 regionServer.metricsRegionServer.updateScannerNext(responseCellSize);
2642               }
2643             } finally {
2644               region.closeRegionOperation();
2645             }
2646 
2647             // coprocessor postNext hook
2648             if (region != null && region.getCoprocessorHost() != null) {
2649               region.getCoprocessorHost().postScannerNext(scanner, results, rows, true);
2650             }
2651           }
2652 
2653           quota.addScanResult(results);
2654 
2655           // If the scanner's filter - if any - is done with the scan
2656           // and wants to tell the client to stop the scan. This is done by passing
2657           // a null result, and setting moreResults to false.
2658           if (scanner.isFilterDone() && results.isEmpty()) {
2659             moreResults = false;
2660             results = null;
2661           } else {
2662             addResults(builder, results, controller, RegionReplicaUtil.isDefaultReplica(region.getRegionInfo()));
2663           }
2664         } catch (IOException e) {
2665           // The scanner state might be left in a dirty state, so we will tell the Client to
2666           // fail this RPC and close the scanner while opening up another one from the start of
2667           // row that the client has last seen.
2668           closeScanner(region, scanner, scannerName);
2669 
2670           // If it is a DoNotRetryIOException already, throw as it is. Unfortunately, DNRIOE is
2671           // used in two different semantics.
2672           // (1) The first is to close the client scanner and bubble up the exception all the way
2673           // to the application. This is preferred when the exception is really un-recoverable
2674           // (like CorruptHFileException, etc). Plain DoNotRetryIOException also falls into this
2675           // bucket usually.
2676           // (2) Second semantics is to close the current region scanner only, but continue the
2677           // client scanner by overriding the exception. This is usually UnknownScannerException,
2678           // OutOfOrderScannerNextException, etc where the region scanner has to be closed, but the
2679           // application-level ClientScanner has to continue without bubbling up the exception to
2680           // the client. See ClientScanner code to see how it deals with these special exceptions.
2681           if (e instanceof DoNotRetryIOException) {
2682             throw e;
2683           }
2684 
2685           // If it is a FileNotFoundException, wrap as a
2686           // DoNotRetryIOException. This can avoid the retry in ClientScanner.
2687           if (e instanceof FileNotFoundException) {
2688             throw new DoNotRetryIOException(e);
2689           }
2690 
2691           // We closed the scanner already. Instead of throwing the IOException, and client
2692           // retrying with the same scannerId only to get USE on the next RPC, we directly throw
2693           // a special exception to save an RPC.
2694           if (VersionInfoUtil.hasMinimumVersion(context.getClientVersionInfo(), 1, 4)) {
2695             // 1.4.0+ clients know how to handle
2696             throw new ScannerResetException("Scanner is closed on the server-side", e);
2697           } else {
2698             // older clients do not know about SRE. Just throw USE, which they will handle
2699             throw new UnknownScannerException("Throwing UnknownScannerException to reset the client"
2700                 + " scanner state for clients older than 1.3.", e);
2701           }
2702         } finally {
2703           // We're done. On way out re-add the above removed lease.
2704           // Adding resets expiration time on lease.
2705           if (scanners.containsKey(scannerName)) {
2706             if (lease != null) regionServer.leases.addLease(lease);
2707             ttl = this.scannerLeaseTimeoutPeriod;
2708           }
2709         }
2710       }
2711 
2712       if (!moreResults || closeScanner) {
2713         ttl = 0;
2714         moreResults = false;
2715         closeScanner(region, scanner, scannerName);
2716       }
2717 
2718       if (ttl > 0) {
2719         builder.setTtl(ttl);
2720       }
2721       builder.setScannerId(scannerId);
2722       builder.setMoreResults(moreResults);
2723       return builder.build();
2724     } catch (IOException ie) {
2725       if (scannerName != null && ie instanceof NotServingRegionException) {
2726         RegionScannerHolder rsh = scanners.remove(scannerName);
2727         if (rsh != null) {
2728           try {
2729             RegionScanner scanner = rsh.s;
2730             LOG.warn(scannerName + " encountered " + ie.getMessage() + ", closing ...");
2731             scanner.close();
2732             regionServer.leases.cancelLease(scannerName);
2733           } catch (IOException e) {
2734            LOG.warn("Getting exception closing " + scannerName, e);
2735           }
2736         }
2737       }
2738       throw new ServiceException(ie);
2739     } finally {
2740       if (quota != null) {
2741         quota.close();
2742       }
2743     }
2744   }
2745 
2746   private boolean closeScanner(Region region, RegionScanner scanner, String scannerName)
2747       throws IOException {
2748     if (region != null && region.getCoprocessorHost() != null) {
2749       if (region.getCoprocessorHost().preScannerClose(scanner)) {
2750         return true; // bypass
2751       }
2752     }
2753     RegionScannerHolder rsh = scanners.remove(scannerName);
2754     if (rsh != null) {
2755       scanner = rsh.s;
2756       scanner.close();
2757       try {
2758         regionServer.leases.cancelLease(scannerName);
2759       } catch (LeaseException le) {
2760         // No problem, ignore
2761         if (LOG.isTraceEnabled()) {
2762           LOG.trace("Un-able to cancel lease of scanner. It could already be closed.");
2763         }
2764       }
2765       if (region != null && region.getCoprocessorHost() != null) {
2766         region.getCoprocessorHost().postScannerClose(scanner);
2767       }
2768     }
2769     return false;
2770   }
2771 
2772   @Override
2773   public CoprocessorServiceResponse execRegionServerService(RpcController controller,
2774       CoprocessorServiceRequest request) throws ServiceException {
2775     return regionServer.execRegionServerService(controller, request);
2776   }
2777 
2778   @Override
2779   public UpdateConfigurationResponse updateConfiguration(
2780       RpcController controller, UpdateConfigurationRequest request)
2781       throws ServiceException {
2782     try {
2783       this.regionServer.updateConfiguration();
2784     } catch (Exception e) {
2785       throw new ServiceException(e);
2786     }
2787     return UpdateConfigurationResponse.getDefaultInstance();
2788   }
2789 }