001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import java.io.FileNotFoundException;
021import java.io.IOException;
022import java.io.UncheckedIOException;
023import java.lang.reflect.InvocationTargetException;
024import java.lang.reflect.Method;
025import java.net.BindException;
026import java.net.InetSocketAddress;
027import java.nio.ByteBuffer;
028import java.util.ArrayList;
029import java.util.Arrays;
030import java.util.Collections;
031import java.util.HashMap;
032import java.util.Iterator;
033import java.util.List;
034import java.util.Map;
035import java.util.Map.Entry;
036import java.util.NavigableMap;
037import java.util.Optional;
038import java.util.Set;
039import java.util.TreeSet;
040import java.util.concurrent.ConcurrentHashMap;
041import java.util.concurrent.ConcurrentMap;
042import java.util.concurrent.TimeUnit;
043import java.util.concurrent.atomic.AtomicBoolean;
044import java.util.concurrent.atomic.AtomicLong;
045import java.util.concurrent.atomic.LongAdder;
046import org.apache.commons.lang3.mutable.MutableObject;
047import org.apache.hadoop.conf.Configuration;
048import org.apache.hadoop.fs.Path;
049import org.apache.hadoop.hbase.ByteBufferExtendedCell;
050import org.apache.hadoop.hbase.CacheEvictionStats;
051import org.apache.hadoop.hbase.CacheEvictionStatsBuilder;
052import org.apache.hadoop.hbase.Cell;
053import org.apache.hadoop.hbase.CellScannable;
054import org.apache.hadoop.hbase.CellScanner;
055import org.apache.hadoop.hbase.CellUtil;
056import org.apache.hadoop.hbase.DoNotRetryIOException;
057import org.apache.hadoop.hbase.DroppedSnapshotException;
058import org.apache.hadoop.hbase.HBaseIOException;
059import org.apache.hadoop.hbase.HConstants;
060import org.apache.hadoop.hbase.MultiActionResultTooLarge;
061import org.apache.hadoop.hbase.NotServingRegionException;
062import org.apache.hadoop.hbase.PrivateCellUtil;
063import org.apache.hadoop.hbase.RegionTooBusyException;
064import org.apache.hadoop.hbase.Server;
065import org.apache.hadoop.hbase.ServerName;
066import org.apache.hadoop.hbase.TableName;
067import org.apache.hadoop.hbase.UnknownScannerException;
068import org.apache.hadoop.hbase.client.Append;
069import org.apache.hadoop.hbase.client.CheckAndMutate;
070import org.apache.hadoop.hbase.client.CheckAndMutateResult;
071import org.apache.hadoop.hbase.client.ConnectionUtils;
072import org.apache.hadoop.hbase.client.Delete;
073import org.apache.hadoop.hbase.client.Durability;
074import org.apache.hadoop.hbase.client.Get;
075import org.apache.hadoop.hbase.client.Increment;
076import org.apache.hadoop.hbase.client.Mutation;
077import org.apache.hadoop.hbase.client.Put;
078import org.apache.hadoop.hbase.client.RegionInfo;
079import org.apache.hadoop.hbase.client.RegionReplicaUtil;
080import org.apache.hadoop.hbase.client.Result;
081import org.apache.hadoop.hbase.client.Row;
082import org.apache.hadoop.hbase.client.Scan;
083import org.apache.hadoop.hbase.client.TableDescriptor;
084import org.apache.hadoop.hbase.client.VersionInfoUtil;
085import org.apache.hadoop.hbase.conf.ConfigurationObserver;
086import org.apache.hadoop.hbase.exceptions.FailedSanityCheckException;
087import org.apache.hadoop.hbase.exceptions.OutOfOrderScannerNextException;
088import org.apache.hadoop.hbase.exceptions.ScannerResetException;
089import org.apache.hadoop.hbase.exceptions.UnknownProtocolException;
090import org.apache.hadoop.hbase.io.ByteBuffAllocator;
091import org.apache.hadoop.hbase.io.hfile.BlockCache;
092import org.apache.hadoop.hbase.ipc.HBaseRPCErrorHandler;
093import org.apache.hadoop.hbase.ipc.HBaseRpcController;
094import org.apache.hadoop.hbase.ipc.PriorityFunction;
095import org.apache.hadoop.hbase.ipc.QosPriority;
096import org.apache.hadoop.hbase.ipc.RpcCallContext;
097import org.apache.hadoop.hbase.ipc.RpcCallback;
098import org.apache.hadoop.hbase.ipc.RpcScheduler;
099import org.apache.hadoop.hbase.ipc.RpcServer;
100import org.apache.hadoop.hbase.ipc.RpcServer.BlockingServiceAndInterface;
101import org.apache.hadoop.hbase.ipc.RpcServerFactory;
102import org.apache.hadoop.hbase.ipc.RpcServerInterface;
103import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
104import org.apache.hadoop.hbase.ipc.ServerRpcController;
105import org.apache.hadoop.hbase.log.HBaseMarkers;
106import org.apache.hadoop.hbase.master.HMaster;
107import org.apache.hadoop.hbase.master.MasterRpcServices;
108import org.apache.hadoop.hbase.namequeues.NamedQueuePayload;
109import org.apache.hadoop.hbase.namequeues.NamedQueueRecorder;
110import org.apache.hadoop.hbase.namequeues.RpcLogDetails;
111import org.apache.hadoop.hbase.namequeues.request.NamedQueueGetRequest;
112import org.apache.hadoop.hbase.namequeues.response.NamedQueueGetResponse;
113import org.apache.hadoop.hbase.net.Address;
114import org.apache.hadoop.hbase.procedure2.RSProcedureCallable;
115import org.apache.hadoop.hbase.quotas.ActivePolicyEnforcement;
116import org.apache.hadoop.hbase.quotas.OperationQuota;
117import org.apache.hadoop.hbase.quotas.QuotaUtil;
118import org.apache.hadoop.hbase.quotas.RegionServerRpcQuotaManager;
119import org.apache.hadoop.hbase.quotas.RegionServerSpaceQuotaManager;
120import org.apache.hadoop.hbase.quotas.SpaceQuotaSnapshot;
121import org.apache.hadoop.hbase.quotas.SpaceViolationPolicyEnforcement;
122import org.apache.hadoop.hbase.regionserver.HRegion.RegionScannerImpl;
123import org.apache.hadoop.hbase.regionserver.LeaseManager.Lease;
124import org.apache.hadoop.hbase.regionserver.LeaseManager.LeaseStillHeldException;
125import org.apache.hadoop.hbase.regionserver.Region.Operation;
126import org.apache.hadoop.hbase.regionserver.ScannerContext.LimitScope;
127import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
128import org.apache.hadoop.hbase.regionserver.handler.AssignRegionHandler;
129import org.apache.hadoop.hbase.regionserver.handler.OpenMetaHandler;
130import org.apache.hadoop.hbase.regionserver.handler.OpenPriorityRegionHandler;
131import org.apache.hadoop.hbase.regionserver.handler.OpenRegionHandler;
132import org.apache.hadoop.hbase.regionserver.handler.UnassignRegionHandler;
133import org.apache.hadoop.hbase.security.Superusers;
134import org.apache.hadoop.hbase.security.User;
135import org.apache.hadoop.hbase.security.access.AccessChecker;
136import org.apache.hadoop.hbase.security.access.NoopAccessChecker;
137import org.apache.hadoop.hbase.security.access.Permission;
138import org.apache.hadoop.hbase.security.access.ZKPermissionWatcher;
139import org.apache.hadoop.hbase.util.Bytes;
140import org.apache.hadoop.hbase.util.DNS;
141import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
142import org.apache.hadoop.hbase.util.Pair;
143import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil;
144import org.apache.hadoop.hbase.wal.WAL;
145import org.apache.hadoop.hbase.wal.WALEdit;
146import org.apache.hadoop.hbase.wal.WALKey;
147import org.apache.hadoop.hbase.wal.WALSplitUtil;
148import org.apache.hadoop.hbase.wal.WALSplitUtil.MutationReplay;
149import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
150import org.apache.yetus.audience.InterfaceAudience;
151import org.apache.zookeeper.KeeperException;
152import org.slf4j.Logger;
153import org.slf4j.LoggerFactory;
154
155import org.apache.hbase.thirdparty.com.google.common.cache.Cache;
156import org.apache.hbase.thirdparty.com.google.common.cache.CacheBuilder;
157import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
158import org.apache.hbase.thirdparty.com.google.protobuf.ByteString;
159import org.apache.hbase.thirdparty.com.google.protobuf.Message;
160import org.apache.hbase.thirdparty.com.google.protobuf.RpcController;
161import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
162import org.apache.hbase.thirdparty.com.google.protobuf.TextFormat;
163import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations;
164import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils;
165
166import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
167import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
168import org.apache.hadoop.hbase.shaded.protobuf.ResponseConverter;
169import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService;
170import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearCompactionQueuesRequest;
171import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearCompactionQueuesResponse;
172import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearRegionBlockCacheRequest;
173import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearRegionBlockCacheResponse;
174import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearSlowLogResponseRequest;
175import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearSlowLogResponses;
176import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CloseRegionRequest;
177import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CloseRegionResponse;
178import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactRegionRequest;
179import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactRegionResponse;
180import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactionSwitchRequest;
181import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactionSwitchResponse;
182import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ExecuteProceduresRequest;
183import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ExecuteProceduresResponse;
184import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionRequest;
185import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionResponse;
186import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetOnlineRegionRequest;
187import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetOnlineRegionResponse;
188import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoRequest;
189import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoResponse;
190import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionLoadRequest;
191import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionLoadResponse;
192import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetServerInfoRequest;
193import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetServerInfoResponse;
194import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetStoreFileRequest;
195import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetStoreFileResponse;
196import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionRequest;
197import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionRequest.RegionOpenInfo;
198import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionResponse;
199import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionResponse.RegionOpeningState;
200import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.RemoteProcedureRequest;
201import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ReplicateWALEntryRequest;
202import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ReplicateWALEntryResponse;
203import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.RollWALWriterRequest;
204import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.RollWALWriterResponse;
205import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.SlowLogResponseRequest;
206import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.SlowLogResponses;
207import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.StopServerRequest;
208import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.StopServerResponse;
209import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateConfigurationRequest;
210import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateConfigurationResponse;
211import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateFavoredNodesRequest;
212import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateFavoredNodesResponse;
213import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WALEntry;
214import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WarmupRegionRequest;
215import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WarmupRegionResponse;
216import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
217import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.Action;
218import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.BulkLoadHFileRequest;
219import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.BulkLoadHFileRequest.FamilyPath;
220import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.BulkLoadHFileResponse;
221import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CleanupBulkLoadRequest;
222import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CleanupBulkLoadResponse;
223import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService;
224import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.Condition;
225import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceRequest;
226import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceResponse;
227import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.GetRequest;
228import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.GetResponse;
229import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MultiRegionLoadStats;
230import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MultiRequest;
231import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MultiResponse;
232import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateRequest;
233import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateResponse;
234import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutationProto;
235import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutationProto.MutationType;
236import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.PrepareBulkLoadRequest;
237import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.PrepareBulkLoadResponse;
238import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.RegionAction;
239import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.RegionActionResult;
240import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ResultOrException;
241import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanRequest;
242import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanResponse;
243import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos;
244import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos.RegionLoad;
245import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
246import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.NameBytesPair;
247import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.NameInt64Pair;
248import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier;
249import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType;
250import org.apache.hadoop.hbase.shaded.protobuf.generated.MapReduceProtos.ScanMetrics;
251import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsRequest;
252import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsResponse;
253import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsResponse.TableQuotaSnapshot;
254import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader;
255import org.apache.hadoop.hbase.shaded.protobuf.generated.TooSlowLog.SlowLogPayload;
256import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.BulkLoadDescriptor;
257import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.CompactionDescriptor;
258import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.FlushDescriptor;
259import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.RegionEventDescriptor;
260
261/**
262 * Implements the regionserver RPC services.
263 */
264@InterfaceAudience.Private
265@SuppressWarnings("deprecation")
266public class RSRpcServices implements HBaseRPCErrorHandler,
267    AdminService.BlockingInterface, ClientService.BlockingInterface, PriorityFunction,
268    ConfigurationObserver {
269  protected static final Logger LOG = LoggerFactory.getLogger(RSRpcServices.class);
270
271  /** RPC scheduler to use for the region server. */
272  public static final String REGION_SERVER_RPC_SCHEDULER_FACTORY_CLASS =
273    "hbase.region.server.rpc.scheduler.factory.class";
274
275  /** RPC scheduler to use for the master. */
276  public static final String MASTER_RPC_SCHEDULER_FACTORY_CLASS =
277    "hbase.master.rpc.scheduler.factory.class";
278
279  /**
280   * Minimum allowable time limit delta (in milliseconds) that can be enforced during scans. This
281   * configuration exists to prevent the scenario where a time limit is specified to be so
282   * restrictive that the time limit is reached immediately (before any cells are scanned).
283   */
284  private static final String REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA =
285      "hbase.region.server.rpc.minimum.scan.time.limit.delta";
286  /**
287   * Default value of {@link RSRpcServices#REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA}
288   */
289  private static final long DEFAULT_REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA = 10;
290
291  /*
292   * Whether to reject rows with size > threshold defined by
293   * {@link RSRpcServices#BATCH_ROWS_THRESHOLD_NAME}
294   */
295  private static final String REJECT_BATCH_ROWS_OVER_THRESHOLD =
296    "hbase.rpc.rows.size.threshold.reject";
297
298  /*
299   * Default value of config {@link RSRpcServices#REJECT_BATCH_ROWS_OVER_THRESHOLD}
300   */
301  private static final boolean DEFAULT_REJECT_BATCH_ROWS_OVER_THRESHOLD = false;
302
303  // Request counter. (Includes requests that are not serviced by regions.)
304  // Count only once for requests with multiple actions like multi/caching-scan/replayBatch
305  final LongAdder requestCount = new LongAdder();
306
307  // Request counter for rpc get
308  final LongAdder rpcGetRequestCount = new LongAdder();
309
310  // Request counter for rpc scan
311  final LongAdder rpcScanRequestCount = new LongAdder();
312  
313  // Request counter for scans that might end up in full scans
314  final LongAdder rpcFullScanRequestCount = new LongAdder();
315
316  // Request counter for rpc multi
317  final LongAdder rpcMultiRequestCount = new LongAdder();
318
319  // Request counter for rpc mutate
320  final LongAdder rpcMutateRequestCount = new LongAdder();
321
322  // Server to handle client requests.
323  final RpcServerInterface rpcServer;
324  final InetSocketAddress isa;
325
326  protected final HRegionServer regionServer;
327  private final long maxScannerResultSize;
328
329  // The reference to the priority extraction function
330  private final PriorityFunction priority;
331
332  private ScannerIdGenerator scannerIdGenerator;
333  private final ConcurrentMap<String, RegionScannerHolder> scanners = new ConcurrentHashMap<>();
334  // Hold the name of a closed scanner for a while. This is used to keep compatible for old clients
335  // which may send next or close request to a region scanner which has already been exhausted. The
336  // entries will be removed automatically after scannerLeaseTimeoutPeriod.
337  private final Cache<String, String> closedScanners;
338  /**
339   * The lease timeout period for client scanners (milliseconds).
340   */
341  private final int scannerLeaseTimeoutPeriod;
342
343  /**
344   * The RPC timeout period (milliseconds)
345   */
346  private final int rpcTimeout;
347
348  /**
349   * The minimum allowable delta to use for the scan limit
350   */
351  private final long minimumScanTimeLimitDelta;
352
353  /**
354   * Row size threshold for multi requests above which a warning is logged
355   */
356  private final int rowSizeWarnThreshold;
357  /*
358   * Whether we should reject requests with very high no of rows i.e. beyond threshold
359   * defined by rowSizeWarnThreshold
360   */
361  private final boolean rejectRowsWithSizeOverThreshold;
362
363  final AtomicBoolean clearCompactionQueues = new AtomicBoolean(false);
364
365  private AccessChecker accessChecker;
366  private ZKPermissionWatcher zkPermissionWatcher;
367
368  /**
369   * Services launched in RSRpcServices. By default they are on but you can use the below
370   * booleans to selectively enable/disable either Admin or Client Service (Rare is the case
371   * where you would ever turn off one or the other).
372   */
373  public static final String REGIONSERVER_ADMIN_SERVICE_CONFIG =
374      "hbase.regionserver.admin.executorService";
375  public static final String REGIONSERVER_CLIENT_SERVICE_CONFIG =
376      "hbase.regionserver.client.executorService";
377
378  /**
379   * An Rpc callback for closing a RegionScanner.
380   */
381  private static final class RegionScannerCloseCallBack implements RpcCallback {
382
383    private final RegionScanner scanner;
384
385    public RegionScannerCloseCallBack(RegionScanner scanner) {
386      this.scanner = scanner;
387    }
388
389    @Override
390    public void run() throws IOException {
391      this.scanner.close();
392    }
393  }
394
395  /**
396   * An Rpc callback for doing shipped() call on a RegionScanner.
397   */
398  private class RegionScannerShippedCallBack implements RpcCallback {
399
400    private final String scannerName;
401    private final Shipper shipper;
402    private final Lease lease;
403
404    public RegionScannerShippedCallBack(String scannerName, Shipper shipper, Lease lease) {
405      this.scannerName = scannerName;
406      this.shipper = shipper;
407      this.lease = lease;
408    }
409
410    @Override
411    public void run() throws IOException {
412      this.shipper.shipped();
413      // We're done. On way out re-add the above removed lease. The lease was temp removed for this
414      // Rpc call and we are at end of the call now. Time to add it back.
415      if (scanners.containsKey(scannerName)) {
416        if (lease != null) {
417          regionServer.getLeaseManager().addLease(lease);
418        }
419      }
420    }
421  }
422
423  /**
424   * An RpcCallBack that creates a list of scanners that needs to perform callBack operation on
425   * completion of multiGets.
426   */
427   static class RegionScannersCloseCallBack implements RpcCallback {
428    private final List<RegionScanner> scanners = new ArrayList<>();
429
430    public void addScanner(RegionScanner scanner) {
431      this.scanners.add(scanner);
432    }
433
434    @Override
435    public void run() {
436      for (RegionScanner scanner : scanners) {
437        try {
438          scanner.close();
439        } catch (IOException e) {
440          LOG.error("Exception while closing the scanner " + scanner, e);
441        }
442      }
443    }
444  }
445
446  /**
447   * Holder class which holds the RegionScanner, nextCallSeq and RpcCallbacks together.
448   */
449  private static final class RegionScannerHolder {
450
451    private final AtomicLong nextCallSeq = new AtomicLong(0);
452    private final String scannerName;
453    private final RegionScanner s;
454    private final HRegion r;
455    private final RpcCallback closeCallBack;
456    private final RpcCallback shippedCallback;
457    private byte[] rowOfLastPartialResult;
458    private boolean needCursor;
459    private boolean fullRegionScan;
460
461    public RegionScannerHolder(String scannerName, RegionScanner s, HRegion r,
462        RpcCallback closeCallBack, RpcCallback shippedCallback, boolean needCursor,
463        boolean fullRegionScan) {
464      this.scannerName = scannerName;
465      this.s = s;
466      this.r = r;
467      this.closeCallBack = closeCallBack;
468      this.shippedCallback = shippedCallback;
469      this.needCursor = needCursor;
470      this.fullRegionScan = fullRegionScan;
471    }
472
473    public long getNextCallSeq() {
474      return nextCallSeq.get();
475    }
476
477    public boolean incNextCallSeq(long currentSeq) {
478      // Use CAS to prevent multiple scan request running on the same scanner.
479      return nextCallSeq.compareAndSet(currentSeq, currentSeq + 1);
480    }
481  }
482
483  /**
484   * Instantiated as a scanner lease. If the lease times out, the scanner is
485   * closed
486   */
487  private class ScannerListener implements LeaseListener {
488    private final String scannerName;
489
490    ScannerListener(final String n) {
491      this.scannerName = n;
492    }
493
494    @Override
495    public void leaseExpired() {
496      RegionScannerHolder rsh = scanners.remove(this.scannerName);
497      if (rsh != null) {
498        RegionScanner s = rsh.s;
499        LOG.info("Scanner " + this.scannerName + " lease expired on region "
500          + s.getRegionInfo().getRegionNameAsString());
501        HRegion region = null;
502        try {
503          region = regionServer.getRegion(s.getRegionInfo().getRegionName());
504          if (region != null && region.getCoprocessorHost() != null) {
505            region.getCoprocessorHost().preScannerClose(s);
506          }
507        } catch (IOException e) {
508          LOG.error("Closing scanner for " + s.getRegionInfo().getRegionNameAsString(), e);
509        } finally {
510          try {
511            s.close();
512            if (region != null && region.getCoprocessorHost() != null) {
513              region.getCoprocessorHost().postScannerClose(s);
514            }
515          } catch (IOException e) {
516            LOG.error("Closing scanner for " + s.getRegionInfo().getRegionNameAsString(), e);
517          }
518        }
519      } else {
520        LOG.warn("Scanner " + this.scannerName + " lease expired, but no related" +
521          " scanner found, hence no chance to close that related scanner!");
522      }
523    }
524  }
525
526  private static ResultOrException getResultOrException(final ClientProtos.Result r,
527                                                        final int index){
528    return getResultOrException(ResponseConverter.buildActionResult(r), index);
529  }
530
531  private static ResultOrException getResultOrException(final Exception e, final int index) {
532    return getResultOrException(ResponseConverter.buildActionResult(e), index);
533  }
534
535  private static ResultOrException getResultOrException(
536      final ResultOrException.Builder builder, final int index) {
537    return builder.setIndex(index).build();
538  }
539
540  /**
541   * Checks for the following pre-checks in order:
542   * <ol>
543   *   <li>RegionServer is running</li>
544   *   <li>If authorization is enabled, then RPC caller has ADMIN permissions</li>
545   * </ol>
546   * @param requestName name of rpc request. Used in reporting failures to provide context.
547   * @throws ServiceException If any of the above listed pre-check fails.
548   */
549  private void rpcPreCheck(String requestName) throws ServiceException {
550    try {
551      checkOpen();
552      requirePermission(requestName, Permission.Action.ADMIN);
553    } catch (IOException ioe) {
554      throw new ServiceException(ioe);
555    }
556  }
557
558  private boolean isClientCellBlockSupport(RpcCallContext context) {
559    return context != null && context.isClientCellBlockSupported();
560  }
561
562  private void addResult(final MutateResponse.Builder builder, final Result result,
563      final HBaseRpcController rpcc, boolean clientCellBlockSupported) {
564    if (result == null) return;
565    if (clientCellBlockSupported) {
566      builder.setResult(ProtobufUtil.toResultNoData(result));
567      rpcc.setCellScanner(result.cellScanner());
568    } else {
569      ClientProtos.Result pbr = ProtobufUtil.toResult(result);
570      builder.setResult(pbr);
571    }
572  }
573
574  private void addResults(ScanResponse.Builder builder, List<Result> results,
575      HBaseRpcController controller, boolean isDefaultRegion, boolean clientCellBlockSupported) {
576    builder.setStale(!isDefaultRegion);
577    if (results.isEmpty()) {
578      return;
579    }
580    if (clientCellBlockSupported) {
581      for (Result res : results) {
582        builder.addCellsPerResult(res.size());
583        builder.addPartialFlagPerResult(res.mayHaveMoreCellsInRow());
584      }
585      controller.setCellScanner(CellUtil.createCellScanner(results));
586    } else {
587      for (Result res : results) {
588        ClientProtos.Result pbr = ProtobufUtil.toResult(res);
589        builder.addResults(pbr);
590      }
591    }
592  }
593
594  private CheckAndMutateResult checkAndMutate(HRegion region, List<ClientProtos.Action> actions,
595    CellScanner cellScanner, Condition condition, ActivePolicyEnforcement spaceQuotaEnforcement)
596    throws IOException {
597    int countOfCompleteMutation = 0;
598    try {
599      if (!region.getRegionInfo().isMetaRegion()) {
600        regionServer.getMemStoreFlusher().reclaimMemStoreMemory();
601      }
602      List<Mutation> mutations = new ArrayList<>();
603      for (ClientProtos.Action action: actions) {
604        if (action.hasGet()) {
605          throw new DoNotRetryIOException("Atomic put and/or delete only, not a Get=" +
606            action.getGet());
607        }
608        MutationType type = action.getMutation().getMutateType();
609        switch (type) {
610          case PUT:
611            Put put = ProtobufUtil.toPut(action.getMutation(), cellScanner);
612            ++countOfCompleteMutation;
613            checkCellSizeLimit(region, put);
614            spaceQuotaEnforcement.getPolicyEnforcement(region).check(put);
615            mutations.add(put);
616            break;
617          case DELETE:
618            Delete del = ProtobufUtil.toDelete(action.getMutation(), cellScanner);
619            ++countOfCompleteMutation;
620            spaceQuotaEnforcement.getPolicyEnforcement(region).check(del);
621            mutations.add(del);
622            break;
623          case INCREMENT:
624            Increment increment = ProtobufUtil.toIncrement(action.getMutation(), cellScanner);
625            ++countOfCompleteMutation;
626            checkCellSizeLimit(region, increment);
627            spaceQuotaEnforcement.getPolicyEnforcement(region).check(increment);
628            mutations.add(increment);
629            break;
630          case APPEND:
631            Append append = ProtobufUtil.toAppend(action.getMutation(), cellScanner);
632            ++countOfCompleteMutation;
633            checkCellSizeLimit(region, append);
634            spaceQuotaEnforcement.getPolicyEnforcement(region).check(append);
635            mutations.add(append);
636            break;
637          default:
638            throw new DoNotRetryIOException("invalid mutation type : " + type);
639        }
640      }
641
642      if (mutations.size() == 0) {
643        return new CheckAndMutateResult(true, null);
644      } else {
645        CheckAndMutate checkAndMutate = ProtobufUtil.toCheckAndMutate(condition, mutations);
646        CheckAndMutateResult result = null;
647        if (region.getCoprocessorHost() != null) {
648          result = region.getCoprocessorHost().preCheckAndMutate(checkAndMutate);
649        }
650        if (result == null) {
651          result = region.checkAndMutate(checkAndMutate);
652          if (region.getCoprocessorHost() != null) {
653            result = region.getCoprocessorHost().postCheckAndMutate(checkAndMutate, result);
654          }
655        }
656        return result;
657      }
658    } finally {
659      // Currently, the checkAndMutate isn't supported by batch so it won't mess up the cell scanner
660      // even if the malformed cells are not skipped.
661      for (int i = countOfCompleteMutation; i < actions.size(); ++i) {
662        skipCellsForMutation(actions.get(i), cellScanner);
663      }
664    }
665  }
666
667  /**
668   * Execute an append mutation.
669   *
670   * @return result to return to client if default operation should be
671   * bypassed as indicated by RegionObserver, null otherwise
672   */
673  private Result append(final HRegion region, final OperationQuota quota,
674      final MutationProto mutation, final CellScanner cellScanner, long nonceGroup,
675      ActivePolicyEnforcement spaceQuota)
676      throws IOException {
677    long before = EnvironmentEdgeManager.currentTime();
678    Append append = ProtobufUtil.toAppend(mutation, cellScanner);
679    checkCellSizeLimit(region, append);
680    spaceQuota.getPolicyEnforcement(region).check(append);
681    quota.addMutation(append);
682    long nonce = mutation.hasNonce() ? mutation.getNonce() : HConstants.NO_NONCE;
683    Result r = region.append(append, nonceGroup, nonce);
684    if (regionServer.getMetrics() != null) {
685      regionServer.getMetrics().updateAppend(region.getTableDescriptor().getTableName(),
686        EnvironmentEdgeManager.currentTime() - before);
687    }
688    return r == null ? Result.EMPTY_RESULT : r;
689  }
690
691  /**
692   * Execute an increment mutation.
693   */
694  private Result increment(final HRegion region, final OperationQuota quota,
695      final MutationProto mutation, final CellScanner cells, long nonceGroup,
696      ActivePolicyEnforcement spaceQuota)
697      throws IOException {
698    long before = EnvironmentEdgeManager.currentTime();
699    Increment increment = ProtobufUtil.toIncrement(mutation, cells);
700    checkCellSizeLimit(region, increment);
701    spaceQuota.getPolicyEnforcement(region).check(increment);
702    quota.addMutation(increment);
703    long nonce = mutation.hasNonce() ? mutation.getNonce() : HConstants.NO_NONCE;
704    Result r = region.increment(increment, nonceGroup, nonce);
705    final MetricsRegionServer metricsRegionServer = regionServer.getMetrics();
706    if (metricsRegionServer != null) {
707      metricsRegionServer.updateIncrement(region.getTableDescriptor().getTableName(),
708        EnvironmentEdgeManager.currentTime() - before);
709    }
710    return r == null ? Result.EMPTY_RESULT : r;
711  }
712
713  /**
714   * Run through the regionMutation <code>rm</code> and per Mutation, do the work, and then when
715   * done, add an instance of a {@link ResultOrException} that corresponds to each Mutation.
716   * @param cellsToReturn  Could be null. May be allocated in this method.  This is what this
717   * method returns as a 'result'.
718   * @param closeCallBack the callback to be used with multigets
719   * @param context the current RpcCallContext
720   * @return Return the <code>cellScanner</code> passed
721   */
722  private List<CellScannable> doNonAtomicRegionMutation(final HRegion region,
723      final OperationQuota quota, final RegionAction actions, final CellScanner cellScanner,
724      final RegionActionResult.Builder builder, List<CellScannable> cellsToReturn, long nonceGroup,
725      final RegionScannersCloseCallBack closeCallBack, RpcCallContext context,
726      ActivePolicyEnforcement spaceQuotaEnforcement) {
727    // Gather up CONTIGUOUS Puts and Deletes in this mutations List.  Idea is that rather than do
728    // one at a time, we instead pass them in batch.  Be aware that the corresponding
729    // ResultOrException instance that matches each Put or Delete is then added down in the
730    // doNonAtomicBatchOp call.  We should be staying aligned though the Put and Delete are
731    // deferred/batched
732    List<ClientProtos.Action> mutations = null;
733    long maxQuotaResultSize = Math.min(maxScannerResultSize, quota.getReadAvailable());
734    IOException sizeIOE = null;
735    Object lastBlock = null;
736    ClientProtos.ResultOrException.Builder resultOrExceptionBuilder = ResultOrException.newBuilder();
737    boolean hasResultOrException = false;
738    for (ClientProtos.Action action : actions.getActionList()) {
739      hasResultOrException = false;
740      resultOrExceptionBuilder.clear();
741      try {
742        Result r = null;
743
744        if (context != null
745            && context.isRetryImmediatelySupported()
746            && (context.getResponseCellSize() > maxQuotaResultSize
747              || context.getResponseBlockSize() + context.getResponseExceptionSize()
748              > maxQuotaResultSize)) {
749
750          // We're storing the exception since the exception and reason string won't
751          // change after the response size limit is reached.
752          if (sizeIOE == null ) {
753            // We don't need the stack un-winding do don't throw the exception.
754            // Throwing will kill the JVM's JIT.
755            //
756            // Instead just create the exception and then store it.
757            sizeIOE = new MultiActionResultTooLarge("Max size exceeded"
758                + " CellSize: " + context.getResponseCellSize()
759                + " BlockSize: " + context.getResponseBlockSize());
760
761            // Only report the exception once since there's only one request that
762            // caused the exception. Otherwise this number will dominate the exceptions count.
763            rpcServer.getMetrics().exception(sizeIOE);
764          }
765
766          // Now that there's an exception is known to be created
767          // use it for the response.
768          //
769          // This will create a copy in the builder.
770          NameBytesPair pair = ResponseConverter.buildException(sizeIOE);
771          resultOrExceptionBuilder.setException(pair);
772          context.incrementResponseExceptionSize(pair.getSerializedSize());
773          resultOrExceptionBuilder.setIndex(action.getIndex());
774          builder.addResultOrException(resultOrExceptionBuilder.build());
775          skipCellsForMutation(action, cellScanner);
776          continue;
777        }
778        if (action.hasGet()) {
779          long before = EnvironmentEdgeManager.currentTime();
780          ClientProtos.Get pbGet = action.getGet();
781          // An asynchbase client, https://github.com/OpenTSDB/asynchbase, starts by trying to do
782          // a get closest before. Throwing the UnknownProtocolException signals it that it needs
783          // to switch and do hbase2 protocol (HBase servers do not tell clients what versions
784          // they are; its a problem for non-native clients like asynchbase. HBASE-20225.
785          if (pbGet.hasClosestRowBefore() && pbGet.getClosestRowBefore()) {
786            throw new UnknownProtocolException("Is this a pre-hbase-1.0.0 or asynchbase client? " +
787                "Client is invoking getClosestRowBefore removed in hbase-2.0.0 replaced by " +
788                "reverse Scan.");
789          }
790          try {
791            Get get = ProtobufUtil.toGet(pbGet);
792            if (context != null) {
793              r = get(get, (region), closeCallBack, context);
794            } else {
795              r = region.get(get);
796            }
797          } finally {
798            final MetricsRegionServer metricsRegionServer = regionServer.getMetrics();
799            if (metricsRegionServer != null) {
800              metricsRegionServer.updateGet(
801                  region.getTableDescriptor().getTableName(),
802                  EnvironmentEdgeManager.currentTime() - before);
803            }
804          }
805        } else if (action.hasServiceCall()) {
806          hasResultOrException = true;
807          com.google.protobuf.Message result =
808            execServiceOnRegion(region, action.getServiceCall());
809          ClientProtos.CoprocessorServiceResult.Builder serviceResultBuilder =
810            ClientProtos.CoprocessorServiceResult.newBuilder();
811          resultOrExceptionBuilder.setServiceResult(
812            serviceResultBuilder.setValue(
813              serviceResultBuilder.getValueBuilder()
814                .setName(result.getClass().getName())
815                // TODO: Copy!!!
816                .setValue(UnsafeByteOperations.unsafeWrap(result.toByteArray()))));
817        } else if (action.hasMutation()) {
818          MutationType type = action.getMutation().getMutateType();
819          if (type != MutationType.PUT && type != MutationType.DELETE && mutations != null &&
820              !mutations.isEmpty()) {
821            // Flush out any Puts or Deletes already collected.
822            doNonAtomicBatchOp(builder, region, quota, mutations, cellScanner,
823              spaceQuotaEnforcement);
824            mutations.clear();
825          }
826          switch (type) {
827            case APPEND:
828              r = append(region, quota, action.getMutation(), cellScanner, nonceGroup,
829                  spaceQuotaEnforcement);
830              break;
831            case INCREMENT:
832              r = increment(region, quota, action.getMutation(), cellScanner, nonceGroup,
833                  spaceQuotaEnforcement);
834              break;
835            case PUT:
836            case DELETE:
837              // Collect the individual mutations and apply in a batch
838              if (mutations == null) {
839                mutations = new ArrayList<>(actions.getActionCount());
840              }
841              mutations.add(action);
842              break;
843            default:
844              throw new DoNotRetryIOException("Unsupported mutate type: " + type.name());
845          }
846        } else {
847          throw new HBaseIOException("Unexpected Action type");
848        }
849        if (r != null) {
850          ClientProtos.Result pbResult = null;
851          if (isClientCellBlockSupport(context)) {
852            pbResult = ProtobufUtil.toResultNoData(r);
853            //  Hard to guess the size here.  Just make a rough guess.
854            if (cellsToReturn == null) {
855              cellsToReturn = new ArrayList<>();
856            }
857            cellsToReturn.add(r);
858          } else {
859            pbResult = ProtobufUtil.toResult(r);
860          }
861          lastBlock = addSize(context, r, lastBlock);
862          hasResultOrException = true;
863          resultOrExceptionBuilder.setResult(pbResult);
864        }
865        // Could get to here and there was no result and no exception.  Presumes we added
866        // a Put or Delete to the collecting Mutations List for adding later.  In this
867        // case the corresponding ResultOrException instance for the Put or Delete will be added
868        // down in the doNonAtomicBatchOp method call rather than up here.
869      } catch (IOException ie) {
870        rpcServer.getMetrics().exception(ie);
871        hasResultOrException = true;
872        NameBytesPair pair = ResponseConverter.buildException(ie);
873        resultOrExceptionBuilder.setException(pair);
874        context.incrementResponseExceptionSize(pair.getSerializedSize());
875      }
876      if (hasResultOrException) {
877        // Propagate index.
878        resultOrExceptionBuilder.setIndex(action.getIndex());
879        builder.addResultOrException(resultOrExceptionBuilder.build());
880      }
881    }
882    // Finish up any outstanding mutations
883    if (!CollectionUtils.isEmpty(mutations)) {
884      doNonAtomicBatchOp(builder, region, quota, mutations, cellScanner, spaceQuotaEnforcement);
885    }
886    return cellsToReturn;
887  }
888
889  private void checkCellSizeLimit(final HRegion r, final Mutation m) throws IOException {
890    if (r.maxCellSize > 0) {
891      CellScanner cells = m.cellScanner();
892      while (cells.advance()) {
893        int size = PrivateCellUtil.estimatedSerializedSizeOf(cells.current());
894        if (size > r.maxCellSize) {
895          String msg = "Cell with size " + size + " exceeds limit of " + r.maxCellSize + " bytes";
896          LOG.debug(msg);
897          throw new DoNotRetryIOException(msg);
898        }
899      }
900    }
901  }
902
903  private void doAtomicBatchOp(final RegionActionResult.Builder builder, final HRegion region,
904    final OperationQuota quota, final List<ClientProtos.Action> mutations,
905    final CellScanner cells, ActivePolicyEnforcement spaceQuotaEnforcement)
906    throws IOException {
907    // Just throw the exception. The exception will be caught and then added to region-level
908    // exception for RegionAction. Leaving the null to action result is ok since the null
909    // result is viewed as failure by hbase client. And the region-lever exception will be used
910    // to replaced the null result. see AsyncRequestFutureImpl#receiveMultiAction and
911    // AsyncBatchRpcRetryingCaller#onComplete for more details.
912    doBatchOp(builder, region, quota, mutations, cells, spaceQuotaEnforcement, true);
913  }
914
915  private void doNonAtomicBatchOp(final RegionActionResult.Builder builder, final HRegion region,
916    final OperationQuota quota, final List<ClientProtos.Action> mutations,
917    final CellScanner cells, ActivePolicyEnforcement spaceQuotaEnforcement) {
918    try {
919      doBatchOp(builder, region, quota, mutations, cells, spaceQuotaEnforcement, false);
920    } catch (IOException e) {
921      // Set the exception for each action. The mutations in same RegionAction are group to
922      // different batch and then be processed individually. Hence, we don't set the region-level
923      // exception here for whole RegionAction.
924      for (Action mutation : mutations) {
925        builder.addResultOrException(getResultOrException(e, mutation.getIndex()));
926      }
927    }
928  }
929
930  /**
931   * Execute a list of mutations.
932   *
933   * @param builder
934   * @param region
935   * @param mutations
936   */
937  private void doBatchOp(final RegionActionResult.Builder builder, final HRegion region,
938      final OperationQuota quota, final List<ClientProtos.Action> mutations,
939      final CellScanner cells, ActivePolicyEnforcement spaceQuotaEnforcement, boolean atomic)
940      throws IOException {
941    Mutation[] mArray = new Mutation[mutations.size()];
942    long before = EnvironmentEdgeManager.currentTime();
943    boolean batchContainsPuts = false, batchContainsDelete = false;
944    try {
945      /** HBASE-17924
946       * mutationActionMap is a map to map the relation between mutations and actions
947       * since mutation array may have been reoredered.In order to return the right
948       * result or exception to the corresponding actions, We need to know which action
949       * is the mutation belong to. We can't sort ClientProtos.Action array, since they
950       * are bonded to cellscanners.
951       */
952      Map<Mutation, ClientProtos.Action> mutationActionMap = new HashMap<>();
953      int i = 0;
954      for (ClientProtos.Action action: mutations) {
955        if (action.hasGet()) {
956          throw new DoNotRetryIOException("Atomic put and/or delete only, not a Get=" +
957            action.getGet());
958        }
959        MutationProto m = action.getMutation();
960        Mutation mutation;
961        switch (m.getMutateType()) {
962          case PUT:
963            mutation = ProtobufUtil.toPut(m, cells);
964            batchContainsPuts = true;
965            break;
966
967          case DELETE:
968            mutation = ProtobufUtil.toDelete(m, cells);
969            batchContainsDelete = true;
970            break;
971
972          case INCREMENT:
973            mutation = ProtobufUtil.toIncrement(m, cells);
974            break;
975
976          case APPEND:
977            mutation = ProtobufUtil.toAppend(m, cells);
978            break;
979
980          default:
981            throw new DoNotRetryIOException("Invalid mutation type : " + m.getMutateType());
982        }
983        mutationActionMap.put(mutation, action);
984        mArray[i++] = mutation;
985        checkCellSizeLimit(region, mutation);
986        // Check if a space quota disallows this mutation
987        spaceQuotaEnforcement.getPolicyEnforcement(region).check(mutation);
988        quota.addMutation(mutation);
989      }
990
991      if (!region.getRegionInfo().isMetaRegion()) {
992        regionServer.getMemStoreFlusher().reclaimMemStoreMemory();
993      }
994
995      // HBASE-17924
996      // Sort to improve lock efficiency for non-atomic batch of operations. If atomic
997      // order is preserved as its expected from the client
998      if (!atomic) {
999        Arrays.sort(mArray, (v1, v2) -> Row.COMPARATOR.compare(v1, v2));
1000      }
1001
1002      OperationStatus[] codes = region.batchMutate(mArray, atomic, HConstants.NO_NONCE,
1003        HConstants.NO_NONCE);
1004
1005      // When atomic is true, it indicates that the mutateRow API or the batch API with
1006      // RowMutations is called. In this case, we need to merge the results of the
1007      // Increment/Append operations if the mutations include those operations, and set the merged
1008      // result to the first element of the ResultOrException list
1009      if (atomic) {
1010        List<ResultOrException> resultOrExceptions = new ArrayList<>();
1011        List<Result> results = new ArrayList<>();
1012        for (i = 0; i < codes.length; i++) {
1013          if (codes[i].getResult() != null) {
1014            results.add(codes[i].getResult());
1015          }
1016          if (i != 0) {
1017            resultOrExceptions.add(getResultOrException(
1018              ClientProtos.Result.getDefaultInstance(), i));
1019          }
1020        }
1021
1022        if (results.isEmpty()) {
1023          builder.addResultOrException(getResultOrException(
1024            ClientProtos.Result.getDefaultInstance(), 0));
1025        } else {
1026          // Merge the results of the Increment/Append operations
1027          List<Cell> cellList = new ArrayList<>();
1028          for (Result result : results) {
1029            if (result.rawCells() != null) {
1030              cellList.addAll(Arrays.asList(result.rawCells()));
1031            }
1032          }
1033          Result result = Result.create(cellList);
1034
1035          // Set the merged result of the Increment/Append operations to the first element of the
1036          // ResultOrException list
1037          builder.addResultOrException(getResultOrException(ProtobufUtil.toResult(result), 0));
1038        }
1039
1040        builder.addAllResultOrException(resultOrExceptions);
1041        return;
1042      }
1043
1044      for (i = 0; i < codes.length; i++) {
1045        Mutation currentMutation = mArray[i];
1046        ClientProtos.Action currentAction = mutationActionMap.get(currentMutation);
1047        int index = currentAction.hasIndex() ? currentAction.getIndex() : i;
1048        Exception e;
1049        switch (codes[i].getOperationStatusCode()) {
1050          case BAD_FAMILY:
1051            e = new NoSuchColumnFamilyException(codes[i].getExceptionMsg());
1052            builder.addResultOrException(getResultOrException(e, index));
1053            break;
1054
1055          case SANITY_CHECK_FAILURE:
1056            e = new FailedSanityCheckException(codes[i].getExceptionMsg());
1057            builder.addResultOrException(getResultOrException(e, index));
1058            break;
1059
1060          default:
1061            e = new DoNotRetryIOException(codes[i].getExceptionMsg());
1062            builder.addResultOrException(getResultOrException(e, index));
1063            break;
1064
1065          case SUCCESS:
1066            builder.addResultOrException(getResultOrException(
1067              ClientProtos.Result.getDefaultInstance(), index));
1068            break;
1069
1070          case STORE_TOO_BUSY:
1071            e = new RegionTooBusyException(codes[i].getExceptionMsg());
1072            builder.addResultOrException(getResultOrException(e, index));
1073            break;
1074        }
1075      }
1076    } finally {
1077      int processedMutationIndex = 0;
1078      for (Action mutation : mutations) {
1079        // The non-null mArray[i] means the cell scanner has been read.
1080        if (mArray[processedMutationIndex++] == null) {
1081          skipCellsForMutation(mutation, cells);
1082        }
1083      }
1084      updateMutationMetrics(region, before, batchContainsPuts, batchContainsDelete);
1085    }
1086  }
1087
1088  private void updateMutationMetrics(HRegion region, long starttime, boolean batchContainsPuts,
1089    boolean batchContainsDelete) {
1090    final MetricsRegionServer metricsRegionServer = regionServer.getMetrics();
1091    if (metricsRegionServer != null) {
1092      long after = EnvironmentEdgeManager.currentTime();
1093      if (batchContainsPuts) {
1094        metricsRegionServer.updatePutBatch(
1095            region.getTableDescriptor().getTableName(), after - starttime);
1096      }
1097      if (batchContainsDelete) {
1098        metricsRegionServer.updateDeleteBatch(
1099            region.getTableDescriptor().getTableName(), after - starttime);
1100      }
1101    }
1102  }
1103
1104  /**
1105   * Execute a list of Put/Delete mutations. The function returns OperationStatus instead of
1106   * constructing MultiResponse to save a possible loop if caller doesn't need MultiResponse.
1107   * @param region
1108   * @param mutations
1109   * @param replaySeqId
1110   * @return an array of OperationStatus which internally contains the OperationStatusCode and the
1111   *         exceptionMessage if any
1112   * @throws IOException
1113   */
1114  private OperationStatus [] doReplayBatchOp(final HRegion region,
1115      final List<MutationReplay> mutations, long replaySeqId) throws IOException {
1116    long before = EnvironmentEdgeManager.currentTime();
1117    boolean batchContainsPuts = false, batchContainsDelete = false;
1118    try {
1119      for (Iterator<MutationReplay> it = mutations.iterator(); it.hasNext();) {
1120        MutationReplay m = it.next();
1121
1122        if (m.getType() == MutationType.PUT) {
1123          batchContainsPuts = true;
1124        } else {
1125          batchContainsDelete = true;
1126        }
1127
1128        NavigableMap<byte[], List<Cell>> map = m.mutation.getFamilyCellMap();
1129        List<Cell> metaCells = map.get(WALEdit.METAFAMILY);
1130        if (metaCells != null && !metaCells.isEmpty()) {
1131          for (Cell metaCell : metaCells) {
1132            CompactionDescriptor compactionDesc = WALEdit.getCompaction(metaCell);
1133            boolean isDefaultReplica = RegionReplicaUtil.isDefaultReplica(region.getRegionInfo());
1134            HRegion hRegion = region;
1135            if (compactionDesc != null) {
1136              // replay the compaction. Remove the files from stores only if we are the primary
1137              // region replica (thus own the files)
1138              hRegion.replayWALCompactionMarker(compactionDesc, !isDefaultReplica, isDefaultReplica,
1139                replaySeqId);
1140              continue;
1141            }
1142            FlushDescriptor flushDesc = WALEdit.getFlushDescriptor(metaCell);
1143            if (flushDesc != null && !isDefaultReplica) {
1144              hRegion.replayWALFlushMarker(flushDesc, replaySeqId);
1145              continue;
1146            }
1147            RegionEventDescriptor regionEvent = WALEdit.getRegionEventDescriptor(metaCell);
1148            if (regionEvent != null && !isDefaultReplica) {
1149              hRegion.replayWALRegionEventMarker(regionEvent);
1150              continue;
1151            }
1152            BulkLoadDescriptor bulkLoadEvent = WALEdit.getBulkLoadDescriptor(metaCell);
1153            if (bulkLoadEvent != null) {
1154              hRegion.replayWALBulkLoadEventMarker(bulkLoadEvent);
1155              continue;
1156            }
1157          }
1158          it.remove();
1159        }
1160      }
1161      requestCount.increment();
1162      if (!region.getRegionInfo().isMetaRegion()) {
1163        regionServer.getMemStoreFlusher().reclaimMemStoreMemory();
1164      }
1165      return region.batchReplay(mutations.toArray(
1166        new MutationReplay[mutations.size()]), replaySeqId);
1167    } finally {
1168      updateMutationMetrics(region, before, batchContainsPuts, batchContainsDelete);
1169    }
1170  }
1171
1172  private void closeAllScanners() {
1173    // Close any outstanding scanners. Means they'll get an UnknownScanner
1174    // exception next time they come in.
1175    for (Map.Entry<String, RegionScannerHolder> e : scanners.entrySet()) {
1176      try {
1177        e.getValue().s.close();
1178      } catch (IOException ioe) {
1179        LOG.warn("Closing scanner " + e.getKey(), ioe);
1180      }
1181    }
1182  }
1183
1184  // Directly invoked only for testing
1185  public RSRpcServices(final HRegionServer rs) throws IOException {
1186    final Configuration conf = rs.getConfiguration();
1187    regionServer = rs;
1188    rowSizeWarnThreshold = conf.getInt(
1189      HConstants.BATCH_ROWS_THRESHOLD_NAME, HConstants.BATCH_ROWS_THRESHOLD_DEFAULT);
1190    rejectRowsWithSizeOverThreshold =
1191      conf.getBoolean(REJECT_BATCH_ROWS_OVER_THRESHOLD, DEFAULT_REJECT_BATCH_ROWS_OVER_THRESHOLD);
1192
1193    final RpcSchedulerFactory rpcSchedulerFactory;
1194    try {
1195      rpcSchedulerFactory = getRpcSchedulerFactoryClass().asSubclass(RpcSchedulerFactory.class)
1196          .getDeclaredConstructor().newInstance();
1197    } catch (NoSuchMethodException | InvocationTargetException |
1198        InstantiationException | IllegalAccessException e) {
1199      throw new IllegalArgumentException(e);
1200    }
1201    // Server to handle client requests.
1202    final InetSocketAddress initialIsa;
1203    final InetSocketAddress bindAddress;
1204    if(this instanceof MasterRpcServices) {
1205      String hostname = DNS.getHostname(conf, DNS.ServerType.MASTER);
1206      int port = conf.getInt(HConstants.MASTER_PORT, HConstants.DEFAULT_MASTER_PORT);
1207      // Creation of a HSA will force a resolve.
1208      initialIsa = new InetSocketAddress(hostname, port);
1209      bindAddress = new InetSocketAddress(conf.get("hbase.master.ipc.address", hostname), port);
1210    } else {
1211      String hostname = DNS.getHostname(conf, DNS.ServerType.REGIONSERVER);
1212      int port = conf.getInt(HConstants.REGIONSERVER_PORT,
1213        HConstants.DEFAULT_REGIONSERVER_PORT);
1214      // Creation of a HSA will force a resolve.
1215      initialIsa = new InetSocketAddress(hostname, port);
1216      bindAddress =
1217          new InetSocketAddress(conf.get("hbase.regionserver.ipc.address", hostname), port);
1218    }
1219    if (initialIsa.getAddress() == null) {
1220      throw new IllegalArgumentException("Failed resolve of " + initialIsa);
1221    }
1222    priority = createPriority();
1223    // Using Address means we don't get the IP too. Shorten it more even to just the host name
1224    // w/o the domain.
1225    final String name = rs.getProcessName() + "/" +
1226        Address.fromParts(initialIsa.getHostName(), initialIsa.getPort()).toStringWithoutDomain();
1227    // Set how many times to retry talking to another server over Connection.
1228    ConnectionUtils.setServerSideHConnectionRetriesConfig(conf, name, LOG);
1229    rpcServer = createRpcServer(rs, rpcSchedulerFactory, bindAddress, name);
1230    rpcServer.setRsRpcServices(this);
1231    if (!(rs instanceof HMaster)) {
1232      rpcServer.setNamedQueueRecorder(rs.getNamedQueueRecorder());
1233    }
1234    scannerLeaseTimeoutPeriod = conf.getInt(
1235      HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD,
1236      HConstants.DEFAULT_HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD);
1237    maxScannerResultSize = conf.getLong(
1238      HConstants.HBASE_SERVER_SCANNER_MAX_RESULT_SIZE_KEY,
1239      HConstants.DEFAULT_HBASE_SERVER_SCANNER_MAX_RESULT_SIZE);
1240    rpcTimeout = conf.getInt(
1241      HConstants.HBASE_RPC_TIMEOUT_KEY,
1242      HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
1243    minimumScanTimeLimitDelta = conf.getLong(
1244      REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA,
1245      DEFAULT_REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA);
1246
1247    final InetSocketAddress address = rpcServer.getListenerAddress();
1248    if (address == null) {
1249      throw new IOException("Listener channel is closed");
1250    }
1251    // Set our address, however we need the final port that was given to rpcServer
1252    isa = new InetSocketAddress(initialIsa.getHostName(), address.getPort());
1253    rpcServer.setErrorHandler(this);
1254    rs.setName(name);
1255
1256    closedScanners = CacheBuilder.newBuilder()
1257        .expireAfterAccess(scannerLeaseTimeoutPeriod, TimeUnit.MILLISECONDS).build();
1258  }
1259
1260  protected RpcServerInterface createRpcServer(
1261      final Server server,
1262      final RpcSchedulerFactory rpcSchedulerFactory,
1263      final InetSocketAddress bindAddress,
1264      final String name
1265  ) throws IOException {
1266    final Configuration conf = server.getConfiguration();
1267    boolean reservoirEnabled = conf.getBoolean(ByteBuffAllocator.ALLOCATOR_POOL_ENABLED_KEY, true);
1268    try {
1269      return RpcServerFactory.createRpcServer(server, name, getServices(),
1270          bindAddress, // use final bindAddress for this server.
1271          conf, rpcSchedulerFactory.create(conf, this, server), reservoirEnabled);
1272    } catch (BindException be) {
1273      throw new IOException(be.getMessage() + ". To switch ports use the '"
1274          + HConstants.REGIONSERVER_PORT + "' configuration property.",
1275          be.getCause() != null ? be.getCause() : be);
1276    }
1277  }
1278
1279  protected Class<?> getRpcSchedulerFactoryClass() {
1280    final Configuration conf = regionServer.getConfiguration();
1281    return conf.getClass(REGION_SERVER_RPC_SCHEDULER_FACTORY_CLASS,
1282      SimpleRpcSchedulerFactory.class);
1283  }
1284
1285  @Override
1286  public void onConfigurationChange(Configuration newConf) {
1287    if (rpcServer instanceof ConfigurationObserver) {
1288      ((ConfigurationObserver)rpcServer).onConfigurationChange(newConf);
1289    }
1290  }
1291
1292  protected PriorityFunction createPriority() {
1293    return new AnnotationReadingPriorityFunction(this);
1294  }
1295
1296  protected void requirePermission(String request, Permission.Action perm) throws IOException {
1297    if (accessChecker != null) {
1298      accessChecker.requirePermission(RpcServer.getRequestUser().orElse(null), request, null, perm);
1299    }
1300  }
1301
1302  public int getScannersCount() {
1303    return scanners.size();
1304  }
1305
1306  public
1307  RegionScanner getScanner(long scannerId) {
1308    String scannerIdString = Long.toString(scannerId);
1309    RegionScannerHolder scannerHolder = scanners.get(scannerIdString);
1310    if (scannerHolder != null) {
1311      return scannerHolder.s;
1312    }
1313    return null;
1314  }
1315
1316  public String getScanDetailsWithId(long scannerId) {
1317    RegionScanner scanner = getScanner(scannerId);
1318    if (scanner == null) {
1319      return null;
1320    }
1321    StringBuilder builder = new StringBuilder();
1322    builder.append("table: ").append(scanner.getRegionInfo().getTable().getNameAsString());
1323    builder.append(" region: ").append(scanner.getRegionInfo().getRegionNameAsString());
1324    return builder.toString();
1325  }
1326
1327  public String getScanDetailsWithRequest(ScanRequest request) {
1328    try {
1329      if (!request.hasRegion()) {
1330        return null;
1331      }
1332      Region region = getRegion(request.getRegion());
1333      StringBuilder builder = new StringBuilder();
1334      builder.append("table: ").append(region.getRegionInfo().getTable().getNameAsString());
1335      builder.append(" region: ").append(region.getRegionInfo().getRegionNameAsString());
1336      return builder.toString();
1337    } catch (IOException ignored) {
1338      return null;
1339    }
1340  }
1341
1342  /**
1343   * Get the vtime associated with the scanner.
1344   * Currently the vtime is the number of "next" calls.
1345   */
1346  long getScannerVirtualTime(long scannerId) {
1347    String scannerIdString = Long.toString(scannerId);
1348    RegionScannerHolder scannerHolder = scanners.get(scannerIdString);
1349    if (scannerHolder != null) {
1350      return scannerHolder.getNextCallSeq();
1351    }
1352    return 0L;
1353  }
1354
1355  /**
1356   * Method to account for the size of retained cells and retained data blocks.
1357   * @param context rpc call context
1358   * @param r result to add size.
1359   * @param lastBlock last block to check whether we need to add the block size in context.
1360   * @return an object that represents the last referenced block from this response.
1361   */
1362  Object addSize(RpcCallContext context, Result r, Object lastBlock) {
1363    if (context != null && r != null && !r.isEmpty()) {
1364      for (Cell c : r.rawCells()) {
1365        context.incrementResponseCellSize(PrivateCellUtil.estimatedSerializedSizeOf(c));
1366
1367        // Since byte buffers can point all kinds of crazy places it's harder to keep track
1368        // of which blocks are kept alive by what byte buffer.
1369        // So we make a guess.
1370        if (c instanceof ByteBufferExtendedCell) {
1371          ByteBufferExtendedCell bbCell = (ByteBufferExtendedCell) c;
1372          ByteBuffer bb = bbCell.getValueByteBuffer();
1373          if (bb != lastBlock) {
1374            context.incrementResponseBlockSize(bb.capacity());
1375            lastBlock = bb;
1376          }
1377        } else {
1378          // We're using the last block being the same as the current block as
1379          // a proxy for pointing to a new block. This won't be exact.
1380          // If there are multiple gets that bounce back and forth
1381          // Then it's possible that this will over count the size of
1382          // referenced blocks. However it's better to over count and
1383          // use two rpcs than to OOME the regionserver.
1384          byte[] valueArray = c.getValueArray();
1385          if (valueArray != lastBlock) {
1386            context.incrementResponseBlockSize(valueArray.length);
1387            lastBlock = valueArray;
1388          }
1389        }
1390
1391      }
1392    }
1393    return lastBlock;
1394  }
1395
1396  private RegionScannerHolder addScanner(String scannerName, RegionScanner s, Shipper shipper,
1397      HRegion r, boolean needCursor, boolean fullRegionScan) throws LeaseStillHeldException {
1398    Lease lease = regionServer.getLeaseManager().createLease(
1399        scannerName, this.scannerLeaseTimeoutPeriod, new ScannerListener(scannerName));
1400    RpcCallback shippedCallback = new RegionScannerShippedCallBack(scannerName, shipper, lease);
1401    RpcCallback closeCallback;
1402    if (s instanceof RpcCallback) {
1403      closeCallback = (RpcCallback) s;
1404    } else {
1405      closeCallback = new RegionScannerCloseCallBack(s);
1406    }
1407
1408    RegionScannerHolder rsh =
1409        new RegionScannerHolder(scannerName, s, r, closeCallback, shippedCallback,
1410          needCursor, fullRegionScan);
1411    RegionScannerHolder existing = scanners.putIfAbsent(scannerName, rsh);
1412    assert existing == null : "scannerId must be unique within regionserver's whole lifecycle! " +
1413      scannerName;
1414    return rsh;
1415  }
1416
1417  private boolean isFullRegionScan(Scan scan, HRegion region) {
1418    // If the scan start row equals or less than the start key of the region
1419    // and stop row greater than equals end key (if stop row present)
1420    // or if the stop row is empty
1421    // account this as a full region scan
1422    if (Bytes.compareTo(scan.getStartRow(), region.getRegionInfo().getStartKey())  <= 0
1423      && (Bytes.compareTo(scan.getStopRow(), region.getRegionInfo().getEndKey()) >= 0 &&
1424      !Bytes.equals(region.getRegionInfo().getEndKey(), HConstants.EMPTY_END_ROW)
1425      || Bytes.equals(scan.getStopRow(), HConstants.EMPTY_END_ROW))) {
1426      return true;
1427    }
1428    return false;
1429  }
1430
1431  /**
1432   * Find the HRegion based on a region specifier
1433   *
1434   * @param regionSpecifier the region specifier
1435   * @return the corresponding region
1436   * @throws IOException if the specifier is not null,
1437   *    but failed to find the region
1438   */
1439  public HRegion getRegion(
1440      final RegionSpecifier regionSpecifier) throws IOException {
1441    return regionServer.getRegion(regionSpecifier.getValue().toByteArray());
1442  }
1443
1444  /**
1445   * Find the List of HRegions based on a list of region specifiers
1446   *
1447   * @param regionSpecifiers the list of region specifiers
1448   * @return the corresponding list of regions
1449   * @throws IOException if any of the specifiers is not null,
1450   *    but failed to find the region
1451   */
1452  private List<HRegion> getRegions(final List<RegionSpecifier> regionSpecifiers,
1453      final CacheEvictionStatsBuilder stats) {
1454    List<HRegion> regions = Lists.newArrayListWithCapacity(regionSpecifiers.size());
1455    for (RegionSpecifier regionSpecifier: regionSpecifiers) {
1456      try {
1457        regions.add(regionServer.getRegion(regionSpecifier.getValue().toByteArray()));
1458      } catch (NotServingRegionException e) {
1459        stats.addException(regionSpecifier.getValue().toByteArray(), e);
1460      }
1461    }
1462    return regions;
1463  }
1464
1465  public PriorityFunction getPriority() {
1466    return priority;
1467  }
1468
1469  public Configuration getConfiguration() {
1470    return regionServer.getConfiguration();
1471  }
1472
1473  private RegionServerRpcQuotaManager getRpcQuotaManager() {
1474    return regionServer.getRegionServerRpcQuotaManager();
1475  }
1476
1477  private RegionServerSpaceQuotaManager getSpaceQuotaManager() {
1478    return regionServer.getRegionServerSpaceQuotaManager();
1479  }
1480
1481  void start(ZKWatcher zkWatcher) {
1482    if (AccessChecker.isAuthorizationSupported(getConfiguration())) {
1483      accessChecker = new AccessChecker(getConfiguration());
1484    } else {
1485      accessChecker = new NoopAccessChecker(getConfiguration());
1486    }
1487    zkPermissionWatcher =
1488      new ZKPermissionWatcher(zkWatcher, accessChecker.getAuthManager(), getConfiguration());
1489    try {
1490      zkPermissionWatcher.start();
1491    } catch (KeeperException e) {
1492      LOG.error("ZooKeeper permission watcher initialization failed", e);
1493    }
1494    this.scannerIdGenerator = new ScannerIdGenerator(this.regionServer.serverName);
1495    rpcServer.start();
1496  }
1497
1498  void stop() {
1499    if (zkPermissionWatcher != null) {
1500      zkPermissionWatcher.close();
1501    }
1502    closeAllScanners();
1503    rpcServer.stop();
1504  }
1505
1506  /**
1507   * Called to verify that this server is up and running.
1508   */
1509  // TODO : Rename this and HMaster#checkInitialized to isRunning() (or a better name).
1510  protected void checkOpen() throws IOException {
1511    if (regionServer.isAborted()) {
1512      throw new RegionServerAbortedException("Server " + regionServer.serverName + " aborting");
1513    }
1514    if (regionServer.isStopped()) {
1515      throw new RegionServerStoppedException("Server " + regionServer.serverName + " stopping");
1516    }
1517    if (!regionServer.isDataFileSystemOk()) {
1518      throw new RegionServerStoppedException("File system not available");
1519    }
1520    if (!regionServer.isOnline()) {
1521      throw new ServerNotRunningYetException("Server " + regionServer.serverName
1522          + " is not running yet");
1523    }
1524  }
1525
1526  /**
1527   * By default, put up an Admin and a Client Service.
1528   * Set booleans <code>hbase.regionserver.admin.executorService</code> and
1529   * <code>hbase.regionserver.client.executorService</code> if you want to enable/disable services.
1530   * Default is that both are enabled.
1531   * @return immutable list of blocking services and the security info classes that this server
1532   * supports
1533   */
1534  protected List<BlockingServiceAndInterface> getServices() {
1535    boolean admin =
1536      getConfiguration().getBoolean(REGIONSERVER_ADMIN_SERVICE_CONFIG, true);
1537    boolean client =
1538      getConfiguration().getBoolean(REGIONSERVER_CLIENT_SERVICE_CONFIG, true);
1539    List<BlockingServiceAndInterface> bssi = new ArrayList<>();
1540    if (client) {
1541      bssi.add(new BlockingServiceAndInterface(
1542      ClientService.newReflectiveBlockingService(this),
1543      ClientService.BlockingInterface.class));
1544    }
1545    if (admin) {
1546      bssi.add(new BlockingServiceAndInterface(
1547      AdminService.newReflectiveBlockingService(this),
1548      AdminService.BlockingInterface.class));
1549    }
1550    return new org.apache.hbase.thirdparty.com.google.common.collect.
1551        ImmutableList.Builder<BlockingServiceAndInterface>().addAll(bssi).build();
1552  }
1553
1554  public InetSocketAddress getSocketAddress() {
1555    return isa;
1556  }
1557
1558  @Override
1559  public int getPriority(RequestHeader header, Message param, User user) {
1560    return priority.getPriority(header, param, user);
1561  }
1562
1563  @Override
1564  public long getDeadline(RequestHeader header, Message param) {
1565    return priority.getDeadline(header, param);
1566  }
1567
1568  /*
1569   * Check if an OOME and, if so, abort immediately to avoid creating more objects.
1570   *
1571   * @param e
1572   *
1573   * @return True if we OOME'd and are aborting.
1574   */
1575  @Override
1576  public boolean checkOOME(final Throwable e) {
1577    return exitIfOOME(e);
1578  }
1579
1580  public static boolean exitIfOOME(final Throwable e ){
1581    boolean stop = false;
1582    try {
1583      if (e instanceof OutOfMemoryError
1584          || (e.getCause() != null && e.getCause() instanceof OutOfMemoryError)
1585          || (e.getMessage() != null && e.getMessage().contains(
1586              "java.lang.OutOfMemoryError"))) {
1587        stop = true;
1588        LOG.error(HBaseMarkers.FATAL, "Run out of memory; "
1589          + RSRpcServices.class.getSimpleName() + " will abort itself immediately",
1590          e);
1591      }
1592    } finally {
1593      if (stop) {
1594        Runtime.getRuntime().halt(1);
1595      }
1596    }
1597    return stop;
1598  }
1599
1600  /**
1601   * Close a region on the region server.
1602   *
1603   * @param controller the RPC controller
1604   * @param request the request
1605   * @throws ServiceException
1606   */
1607  @Override
1608  @QosPriority(priority=HConstants.ADMIN_QOS)
1609  public CloseRegionResponse closeRegion(final RpcController controller,
1610      final CloseRegionRequest request) throws ServiceException {
1611    final ServerName sn = (request.hasDestinationServer() ?
1612      ProtobufUtil.toServerName(request.getDestinationServer()) : null);
1613
1614    try {
1615      checkOpen();
1616      throwOnWrongStartCode(request);
1617      final String encodedRegionName = ProtobufUtil.getRegionEncodedName(request.getRegion());
1618
1619      requestCount.increment();
1620      if (sn == null) {
1621        LOG.info("Close " + encodedRegionName + " without moving");
1622      } else {
1623        LOG.info("Close " + encodedRegionName + ", moving to " + sn);
1624      }
1625      boolean closed = regionServer.closeRegion(encodedRegionName, false, sn);
1626      CloseRegionResponse.Builder builder = CloseRegionResponse.newBuilder().setClosed(closed);
1627      return builder.build();
1628    } catch (IOException ie) {
1629      throw new ServiceException(ie);
1630    }
1631  }
1632
1633  /**
1634   * Compact a region on the region server.
1635   *
1636   * @param controller the RPC controller
1637   * @param request the request
1638   * @throws ServiceException
1639   */
1640  @Override
1641  @QosPriority(priority = HConstants.ADMIN_QOS)
1642  public CompactRegionResponse compactRegion(final RpcController controller,
1643      final CompactRegionRequest request) throws ServiceException {
1644    try {
1645      checkOpen();
1646      requestCount.increment();
1647      HRegion region = getRegion(request.getRegion());
1648      // Quota support is enabled, the requesting user is not system/super user
1649      // and a quota policy is enforced that disables compactions.
1650      if (QuotaUtil.isQuotaEnabled(getConfiguration()) &&
1651          !Superusers.isSuperUser(RpcServer.getRequestUser().orElse(null)) &&
1652          this.regionServer.getRegionServerSpaceQuotaManager()
1653              .areCompactionsDisabled(region.getTableDescriptor().getTableName())) {
1654        throw new DoNotRetryIOException(
1655            "Compactions on this region are " + "disabled due to a space quota violation.");
1656      }
1657      region.startRegionOperation(Operation.COMPACT_REGION);
1658      LOG.info("Compacting " + region.getRegionInfo().getRegionNameAsString());
1659      boolean major = request.hasMajor() && request.getMajor();
1660      if (request.hasFamily()) {
1661        byte[] family = request.getFamily().toByteArray();
1662        String log = "User-triggered " + (major ? "major " : "") + "compaction for region " +
1663            region.getRegionInfo().getRegionNameAsString() + " and family " +
1664            Bytes.toString(family);
1665        LOG.trace(log);
1666        region.requestCompaction(family, log, Store.PRIORITY_USER, major,
1667          CompactionLifeCycleTracker.DUMMY);
1668      } else {
1669        String log = "User-triggered " + (major ? "major " : "") + "compaction for region " +
1670            region.getRegionInfo().getRegionNameAsString();
1671        LOG.trace(log);
1672        region.requestCompaction(log, Store.PRIORITY_USER, major, CompactionLifeCycleTracker.DUMMY);
1673      }
1674      return CompactRegionResponse.newBuilder().build();
1675    } catch (IOException ie) {
1676      throw new ServiceException(ie);
1677    }
1678  }
1679
1680  @Override
1681  public CompactionSwitchResponse compactionSwitch(RpcController controller,
1682      CompactionSwitchRequest request) throws ServiceException {
1683    try {
1684      checkOpen();
1685      requestCount.increment();
1686      boolean prevState = regionServer.compactSplitThread.isCompactionsEnabled();
1687      CompactionSwitchResponse response =
1688          CompactionSwitchResponse.newBuilder().setPrevState(prevState).build();
1689      if (prevState == request.getEnabled()) {
1690        // passed in requested state is same as current state. No action required
1691        return response;
1692      }
1693      regionServer.compactSplitThread.switchCompaction(request.getEnabled());
1694      return response;
1695    } catch (IOException ie) {
1696      throw new ServiceException(ie);
1697    }
1698  }
1699
1700  /**
1701   * Flush a region on the region server.
1702   *
1703   * @param controller the RPC controller
1704   * @param request the request
1705   * @throws ServiceException
1706   */
1707  @Override
1708  @QosPriority(priority=HConstants.ADMIN_QOS)
1709  public FlushRegionResponse flushRegion(final RpcController controller,
1710      final FlushRegionRequest request) throws ServiceException {
1711    try {
1712      checkOpen();
1713      requestCount.increment();
1714      HRegion region = getRegion(request.getRegion());
1715      LOG.info("Flushing " + region.getRegionInfo().getRegionNameAsString());
1716      boolean shouldFlush = true;
1717      if (request.hasIfOlderThanTs()) {
1718        shouldFlush = region.getEarliestFlushTimeForAllStores() < request.getIfOlderThanTs();
1719      }
1720      FlushRegionResponse.Builder builder = FlushRegionResponse.newBuilder();
1721      if (shouldFlush) {
1722        boolean writeFlushWalMarker =  request.hasWriteFlushWalMarker() ?
1723            request.getWriteFlushWalMarker() : false;
1724        // Go behind the curtain so we can manage writing of the flush WAL marker
1725        HRegion.FlushResultImpl flushResult = null;
1726        if (request.hasFamily()) {
1727          List families = new ArrayList();
1728          families.add(request.getFamily().toByteArray());
1729          flushResult =
1730            region.flushcache(families, writeFlushWalMarker, FlushLifeCycleTracker.DUMMY);
1731        } else {
1732          flushResult = region.flushcache(true, writeFlushWalMarker, FlushLifeCycleTracker.DUMMY);
1733        }
1734        boolean compactionNeeded = flushResult.isCompactionNeeded();
1735        if (compactionNeeded) {
1736          regionServer.compactSplitThread.requestSystemCompaction(region,
1737            "Compaction through user triggered flush");
1738        }
1739        builder.setFlushed(flushResult.isFlushSucceeded());
1740        builder.setWroteFlushWalMarker(flushResult.wroteFlushWalMarker);
1741      }
1742      builder.setLastFlushTime(region.getEarliestFlushTimeForAllStores());
1743      return builder.build();
1744    } catch (DroppedSnapshotException ex) {
1745      // Cache flush can fail in a few places. If it fails in a critical
1746      // section, we get a DroppedSnapshotException and a replay of wal
1747      // is required. Currently the only way to do this is a restart of
1748      // the server.
1749      regionServer.abort("Replay of WAL required. Forcing server shutdown", ex);
1750      throw new ServiceException(ex);
1751    } catch (IOException ie) {
1752      throw new ServiceException(ie);
1753    }
1754  }
1755
1756  @Override
1757  @QosPriority(priority=HConstants.ADMIN_QOS)
1758  public GetOnlineRegionResponse getOnlineRegion(final RpcController controller,
1759      final GetOnlineRegionRequest request) throws ServiceException {
1760    try {
1761      checkOpen();
1762      requestCount.increment();
1763      Map<String, HRegion> onlineRegions = regionServer.getOnlineRegions();
1764      List<RegionInfo> list = new ArrayList<>(onlineRegions.size());
1765      for (HRegion region: onlineRegions.values()) {
1766        list.add(region.getRegionInfo());
1767      }
1768      list.sort(RegionInfo.COMPARATOR);
1769      return ResponseConverter.buildGetOnlineRegionResponse(list);
1770    } catch (IOException ie) {
1771      throw new ServiceException(ie);
1772    }
1773  }
1774
1775  // Master implementation of this Admin Service differs given it is not
1776  // able to supply detail only known to RegionServer. See note on
1777  // MasterRpcServers#getRegionInfo.
1778  @Override
1779  @QosPriority(priority=HConstants.ADMIN_QOS)
1780  public GetRegionInfoResponse getRegionInfo(final RpcController controller,
1781    final GetRegionInfoRequest request) throws ServiceException {
1782    try {
1783      checkOpen();
1784      requestCount.increment();
1785      HRegion region = getRegion(request.getRegion());
1786      RegionInfo info = region.getRegionInfo();
1787      byte[] bestSplitRow;
1788      if (request.hasBestSplitRow() && request.getBestSplitRow()) {
1789        bestSplitRow = region.checkSplit(true).orElse(null);
1790        // when all table data are in memstore, bestSplitRow = null
1791        // try to flush region first
1792        if (bestSplitRow == null) {
1793          region.flush(true);
1794          bestSplitRow = region.checkSplit(true).orElse(null);
1795        }
1796      } else {
1797        bestSplitRow = null;
1798      }
1799      GetRegionInfoResponse.Builder builder = GetRegionInfoResponse.newBuilder();
1800      builder.setRegionInfo(ProtobufUtil.toRegionInfo(info));
1801      if (request.hasCompactionState() && request.getCompactionState()) {
1802        builder.setCompactionState(ProtobufUtil.createCompactionState(region.getCompactionState()));
1803      }
1804      builder.setSplittable(region.isSplittable());
1805      builder.setMergeable(region.isMergeable());
1806      if (request.hasBestSplitRow() && request.getBestSplitRow() && bestSplitRow != null) {
1807        builder.setBestSplitRow(UnsafeByteOperations.unsafeWrap(bestSplitRow));
1808      }
1809      return builder.build();
1810    } catch (IOException ie) {
1811      throw new ServiceException(ie);
1812    }
1813  }
1814
1815  @Override
1816  @QosPriority(priority=HConstants.ADMIN_QOS)
1817  public GetRegionLoadResponse getRegionLoad(RpcController controller,
1818      GetRegionLoadRequest request) throws ServiceException {
1819
1820    List<HRegion> regions;
1821    if (request.hasTableName()) {
1822      TableName tableName = ProtobufUtil.toTableName(request.getTableName());
1823      regions = regionServer.getRegions(tableName);
1824    } else {
1825      regions = regionServer.getRegions();
1826    }
1827    List<RegionLoad> rLoads = new ArrayList<>(regions.size());
1828    RegionLoad.Builder regionLoadBuilder = ClusterStatusProtos.RegionLoad.newBuilder();
1829    RegionSpecifier.Builder regionSpecifier = RegionSpecifier.newBuilder();
1830
1831    try {
1832      for (HRegion region : regions) {
1833        rLoads.add(regionServer.createRegionLoad(region, regionLoadBuilder, regionSpecifier));
1834      }
1835    } catch (IOException e) {
1836      throw new ServiceException(e);
1837    }
1838    GetRegionLoadResponse.Builder builder = GetRegionLoadResponse.newBuilder();
1839    builder.addAllRegionLoads(rLoads);
1840    return builder.build();
1841  }
1842
1843  @Override
1844  @QosPriority(priority=HConstants.ADMIN_QOS)
1845  public ClearCompactionQueuesResponse clearCompactionQueues(RpcController controller,
1846    ClearCompactionQueuesRequest request) throws ServiceException {
1847    LOG.debug("Client=" + RpcServer.getRequestUserName().orElse(null) + "/"
1848        + RpcServer.getRemoteAddress().orElse(null) + " clear compactions queue");
1849    ClearCompactionQueuesResponse.Builder respBuilder = ClearCompactionQueuesResponse.newBuilder();
1850    requestCount.increment();
1851    if (clearCompactionQueues.compareAndSet(false,true)) {
1852      try {
1853        checkOpen();
1854        regionServer.getRegionServerCoprocessorHost().preClearCompactionQueues();
1855        for (String queueName : request.getQueueNameList()) {
1856          LOG.debug("clear " + queueName + " compaction queue");
1857          switch (queueName) {
1858            case "long":
1859              regionServer.compactSplitThread.clearLongCompactionsQueue();
1860              break;
1861            case "short":
1862              regionServer.compactSplitThread.clearShortCompactionsQueue();
1863              break;
1864            default:
1865              LOG.warn("Unknown queue name " + queueName);
1866              throw new IOException("Unknown queue name " + queueName);
1867          }
1868        }
1869        regionServer.getRegionServerCoprocessorHost().postClearCompactionQueues();
1870      } catch (IOException ie) {
1871        throw new ServiceException(ie);
1872      } finally {
1873        clearCompactionQueues.set(false);
1874      }
1875    } else {
1876      LOG.warn("Clear compactions queue is executing by other admin.");
1877    }
1878    return respBuilder.build();
1879  }
1880
1881  /**
1882   * Get some information of the region server.
1883   *
1884   * @param controller the RPC controller
1885   * @param request the request
1886   * @throws ServiceException
1887   */
1888  @Override
1889  @QosPriority(priority=HConstants.ADMIN_QOS)
1890  public GetServerInfoResponse getServerInfo(final RpcController controller,
1891      final GetServerInfoRequest request) throws ServiceException {
1892    try {
1893      checkOpen();
1894    } catch (IOException ie) {
1895      throw new ServiceException(ie);
1896    }
1897    requestCount.increment();
1898    int infoPort = regionServer.infoServer != null ? regionServer.infoServer.getPort() : -1;
1899    return ResponseConverter.buildGetServerInfoResponse(regionServer.serverName, infoPort);
1900  }
1901
1902  @Override
1903  @QosPriority(priority=HConstants.ADMIN_QOS)
1904  public GetStoreFileResponse getStoreFile(final RpcController controller,
1905      final GetStoreFileRequest request) throws ServiceException {
1906    try {
1907      checkOpen();
1908      HRegion region = getRegion(request.getRegion());
1909      requestCount.increment();
1910      Set<byte[]> columnFamilies;
1911      if (request.getFamilyCount() == 0) {
1912        columnFamilies = region.getTableDescriptor().getColumnFamilyNames();
1913      } else {
1914        columnFamilies = new TreeSet<>(Bytes.BYTES_RAWCOMPARATOR);
1915        for (ByteString cf: request.getFamilyList()) {
1916          columnFamilies.add(cf.toByteArray());
1917        }
1918      }
1919      int nCF = columnFamilies.size();
1920      List<String>  fileList = region.getStoreFileList(
1921        columnFamilies.toArray(new byte[nCF][]));
1922      GetStoreFileResponse.Builder builder = GetStoreFileResponse.newBuilder();
1923      builder.addAllStoreFile(fileList);
1924      return builder.build();
1925    } catch (IOException ie) {
1926      throw new ServiceException(ie);
1927    }
1928  }
1929
1930  private void throwOnWrongStartCode(OpenRegionRequest request) throws ServiceException {
1931    if (!request.hasServerStartCode()) {
1932      LOG.warn("OpenRegionRequest for {} does not have a start code", request.getOpenInfoList());
1933      return;
1934    }
1935    throwOnWrongStartCode(request.getServerStartCode());
1936  }
1937
1938  private void throwOnWrongStartCode(CloseRegionRequest request) throws ServiceException {
1939    if (!request.hasServerStartCode()) {
1940      LOG.warn("CloseRegionRequest for {} does not have a start code", request.getRegion());
1941      return;
1942    }
1943    throwOnWrongStartCode(request.getServerStartCode());
1944  }
1945
1946  private void throwOnWrongStartCode(long serverStartCode) throws ServiceException {
1947    // check that we are the same server that this RPC is intended for.
1948    if (regionServer.serverName.getStartcode() != serverStartCode) {
1949      throw new ServiceException(new DoNotRetryIOException(
1950        "This RPC was intended for a " + "different server with startCode: " + serverStartCode +
1951          ", this server is: " + regionServer.serverName));
1952    }
1953  }
1954
1955  private void throwOnWrongStartCode(ExecuteProceduresRequest req) throws ServiceException {
1956    if (req.getOpenRegionCount() > 0) {
1957      for (OpenRegionRequest openReq : req.getOpenRegionList()) {
1958        throwOnWrongStartCode(openReq);
1959      }
1960    }
1961    if (req.getCloseRegionCount() > 0) {
1962      for (CloseRegionRequest closeReq : req.getCloseRegionList()) {
1963        throwOnWrongStartCode(closeReq);
1964      }
1965    }
1966  }
1967
1968  /**
1969   * Open asynchronously a region or a set of regions on the region server.
1970   *
1971   * The opening is coordinated by ZooKeeper, and this method requires the znode to be created
1972   *  before being called. As a consequence, this method should be called only from the master.
1973   * <p>
1974   * Different manages states for the region are:
1975   * </p><ul>
1976   *  <li>region not opened: the region opening will start asynchronously.</li>
1977   *  <li>a close is already in progress: this is considered as an error.</li>
1978   *  <li>an open is already in progress: this new open request will be ignored. This is important
1979   *  because the Master can do multiple requests if it crashes.</li>
1980   *  <li>the region is already opened:  this new open request will be ignored.</li>
1981   *  </ul>
1982   * <p>
1983   * Bulk assign: If there are more than 1 region to open, it will be considered as a bulk assign.
1984   * For a single region opening, errors are sent through a ServiceException. For bulk assign,
1985   * errors are put in the response as FAILED_OPENING.
1986   * </p>
1987   * @param controller the RPC controller
1988   * @param request the request
1989   * @throws ServiceException
1990   */
1991  @Override
1992  @QosPriority(priority=HConstants.ADMIN_QOS)
1993  public OpenRegionResponse openRegion(final RpcController controller,
1994      final OpenRegionRequest request) throws ServiceException {
1995    requestCount.increment();
1996    throwOnWrongStartCode(request);
1997
1998    OpenRegionResponse.Builder builder = OpenRegionResponse.newBuilder();
1999    final int regionCount = request.getOpenInfoCount();
2000    final Map<TableName, TableDescriptor> htds = new HashMap<>(regionCount);
2001    final boolean isBulkAssign = regionCount > 1;
2002    try {
2003      checkOpen();
2004    } catch (IOException ie) {
2005      TableName tableName = null;
2006      if (regionCount == 1) {
2007        org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionInfo ri = request.getOpenInfo(0).getRegion();
2008        if (ri != null) {
2009          tableName = ProtobufUtil.toTableName(ri.getTableName());
2010        }
2011      }
2012      if (!TableName.META_TABLE_NAME.equals(tableName)) {
2013        throw new ServiceException(ie);
2014      }
2015      // We are assigning meta, wait a little for regionserver to finish initialization.
2016      int timeout = regionServer.getConfiguration().getInt(HConstants.HBASE_RPC_TIMEOUT_KEY,
2017        HConstants.DEFAULT_HBASE_RPC_TIMEOUT) >> 2; // Quarter of RPC timeout
2018      long endTime = System.currentTimeMillis() + timeout;
2019      synchronized (regionServer.online) {
2020        try {
2021          while (System.currentTimeMillis() <= endTime
2022              && !regionServer.isStopped() && !regionServer.isOnline()) {
2023            regionServer.online.wait(regionServer.msgInterval);
2024          }
2025          checkOpen();
2026        } catch (InterruptedException t) {
2027          Thread.currentThread().interrupt();
2028          throw new ServiceException(t);
2029        } catch (IOException e) {
2030          throw new ServiceException(e);
2031        }
2032      }
2033    }
2034
2035    long masterSystemTime = request.hasMasterSystemTime() ? request.getMasterSystemTime() : -1;
2036
2037    for (RegionOpenInfo regionOpenInfo : request.getOpenInfoList()) {
2038      final RegionInfo region = ProtobufUtil.toRegionInfo(regionOpenInfo.getRegion());
2039      TableDescriptor htd;
2040      try {
2041        String encodedName = region.getEncodedName();
2042        byte[] encodedNameBytes = region.getEncodedNameAsBytes();
2043        final HRegion onlineRegion = regionServer.getRegion(encodedName);
2044        if (onlineRegion != null) {
2045          // The region is already online. This should not happen any more.
2046          String error = "Received OPEN for the region:"
2047            + region.getRegionNameAsString() + ", which is already online";
2048          LOG.warn(error);
2049          //regionServer.abort(error);
2050          //throw new IOException(error);
2051          builder.addOpeningState(RegionOpeningState.OPENED);
2052          continue;
2053        }
2054        LOG.info("Open " + region.getRegionNameAsString());
2055
2056        final Boolean previous = regionServer.getRegionsInTransitionInRS().putIfAbsent(
2057          encodedNameBytes, Boolean.TRUE);
2058
2059        if (Boolean.FALSE.equals(previous)) {
2060          if (regionServer.getRegion(encodedName) != null) {
2061            // There is a close in progress. This should not happen any more.
2062            String error = "Received OPEN for the region:"
2063              + region.getRegionNameAsString() + ", which we are already trying to CLOSE";
2064            regionServer.abort(error);
2065            throw new IOException(error);
2066          }
2067          regionServer.getRegionsInTransitionInRS().put(encodedNameBytes, Boolean.TRUE);
2068        }
2069
2070        if (Boolean.TRUE.equals(previous)) {
2071          // An open is in progress. This is supported, but let's log this.
2072          LOG.info("Receiving OPEN for the region:" +
2073            region.getRegionNameAsString() + ", which we are already trying to OPEN"
2074              + " - ignoring this new request for this region.");
2075        }
2076
2077        // We are opening this region. If it moves back and forth for whatever reason, we don't
2078        // want to keep returning the stale moved record while we are opening/if we close again.
2079        regionServer.removeFromMovedRegions(region.getEncodedName());
2080
2081        if (previous == null || !previous.booleanValue()) {
2082          htd = htds.get(region.getTable());
2083          if (htd == null) {
2084            htd = regionServer.tableDescriptors.get(region.getTable());
2085            htds.put(region.getTable(), htd);
2086          }
2087          if (htd == null) {
2088            throw new IOException("Missing table descriptor for " + region.getEncodedName());
2089          }
2090          // If there is no action in progress, we can submit a specific handler.
2091          // Need to pass the expected version in the constructor.
2092          if (regionServer.executorService == null) {
2093            LOG.info("No executor executorService; skipping open request");
2094          } else {
2095            if (region.isMetaRegion()) {
2096              regionServer.executorService.submit(new OpenMetaHandler(
2097              regionServer, regionServer, region, htd, masterSystemTime));
2098            } else {
2099              if (regionOpenInfo.getFavoredNodesCount() > 0) {
2100                regionServer.updateRegionFavoredNodesMapping(region.getEncodedName(),
2101                regionOpenInfo.getFavoredNodesList());
2102              }
2103              if (htd.getPriority() >= HConstants.ADMIN_QOS || region.getTable().isSystemTable()) {
2104                regionServer.executorService.submit(new OpenPriorityRegionHandler(
2105                regionServer, regionServer, region, htd, masterSystemTime));
2106              } else {
2107                regionServer.executorService.submit(new OpenRegionHandler(
2108                regionServer, regionServer, region, htd, masterSystemTime));
2109              }
2110            }
2111          }
2112        }
2113
2114        builder.addOpeningState(RegionOpeningState.OPENED);
2115      } catch (IOException ie) {
2116        LOG.warn("Failed opening region " + region.getRegionNameAsString(), ie);
2117        if (isBulkAssign) {
2118          builder.addOpeningState(RegionOpeningState.FAILED_OPENING);
2119        } else {
2120          throw new ServiceException(ie);
2121        }
2122      }
2123    }
2124    return builder.build();
2125  }
2126
2127  /**
2128   * Warmup a region on this server.
2129   * This method should only be called by Master. It synchronously opens the region and
2130   * closes the region bringing the most important pages in cache.
2131   */
2132  @Override
2133  public WarmupRegionResponse warmupRegion(final RpcController controller,
2134      final WarmupRegionRequest request) throws ServiceException {
2135    final RegionInfo region = ProtobufUtil.toRegionInfo(request.getRegionInfo());
2136    WarmupRegionResponse response = WarmupRegionResponse.getDefaultInstance();
2137    try {
2138      checkOpen();
2139      String encodedName = region.getEncodedName();
2140      byte[] encodedNameBytes = region.getEncodedNameAsBytes();
2141      final HRegion onlineRegion = regionServer.getRegion(encodedName);
2142      if (onlineRegion != null) {
2143        LOG.info("{} is online; skipping warmup", region);
2144        return response;
2145      }
2146      TableDescriptor htd = regionServer.tableDescriptors.get(region.getTable());
2147      if (regionServer.getRegionsInTransitionInRS().containsKey(encodedNameBytes)) {
2148        LOG.info("{} is in transition; skipping warmup", region);
2149        return response;
2150      }
2151      LOG.info("Warmup {}", region.getRegionNameAsString());
2152      HRegion.warmupHRegion(region, htd, regionServer.getWAL(region),
2153          regionServer.getConfiguration(), regionServer, null);
2154    } catch (IOException ie) {
2155      LOG.error("Failed warmup of {}", region.getRegionNameAsString(), ie);
2156      throw new ServiceException(ie);
2157    }
2158
2159    return response;
2160  }
2161
2162  /**
2163   * Replay the given changes when distributedLogReplay WAL edits from a failed RS. The guarantee is
2164   * that the given mutations will be durable on the receiving RS if this method returns without any
2165   * exception.
2166   * @param controller the RPC controller
2167   * @param request the request
2168   * @throws ServiceException
2169   */
2170  @Override
2171  @QosPriority(priority = HConstants.REPLAY_QOS)
2172  public ReplicateWALEntryResponse replay(final RpcController controller,
2173      final ReplicateWALEntryRequest request) throws ServiceException {
2174    long before = EnvironmentEdgeManager.currentTime();
2175    CellScanner cells = ((HBaseRpcController) controller).cellScanner();
2176    try {
2177      checkOpen();
2178      List<WALEntry> entries = request.getEntryList();
2179      if (entries == null || entries.isEmpty()) {
2180        // empty input
2181        return ReplicateWALEntryResponse.newBuilder().build();
2182      }
2183      ByteString regionName = entries.get(0).getKey().getEncodedRegionName();
2184      HRegion region = regionServer.getRegionByEncodedName(regionName.toStringUtf8());
2185      RegionCoprocessorHost coprocessorHost =
2186          ServerRegionReplicaUtil.isDefaultReplica(region.getRegionInfo())
2187            ? region.getCoprocessorHost()
2188            : null; // do not invoke coprocessors if this is a secondary region replica
2189      List<Pair<WALKey, WALEdit>> walEntries = new ArrayList<>();
2190
2191      // Skip adding the edits to WAL if this is a secondary region replica
2192      boolean isPrimary = RegionReplicaUtil.isDefaultReplica(region.getRegionInfo());
2193      Durability durability = isPrimary ? Durability.USE_DEFAULT : Durability.SKIP_WAL;
2194
2195      for (WALEntry entry : entries) {
2196        if (!regionName.equals(entry.getKey().getEncodedRegionName())) {
2197          throw new NotServingRegionException("Replay request contains entries from multiple " +
2198              "regions. First region:" + regionName.toStringUtf8() + " , other region:"
2199              + entry.getKey().getEncodedRegionName());
2200        }
2201        if (regionServer.nonceManager != null && isPrimary) {
2202          long nonceGroup = entry.getKey().hasNonceGroup()
2203            ? entry.getKey().getNonceGroup() : HConstants.NO_NONCE;
2204          long nonce = entry.getKey().hasNonce() ? entry.getKey().getNonce() : HConstants.NO_NONCE;
2205          regionServer.nonceManager.reportOperationFromWal(
2206              nonceGroup,
2207              nonce,
2208              entry.getKey().getWriteTime());
2209        }
2210        Pair<WALKey, WALEdit> walEntry = (coprocessorHost == null) ? null : new Pair<>();
2211        List<MutationReplay> edits = WALSplitUtil.getMutationsFromWALEntry(entry,
2212          cells, walEntry, durability);
2213        if (coprocessorHost != null) {
2214          // Start coprocessor replay here. The coprocessor is for each WALEdit instead of a
2215          // KeyValue.
2216          if (coprocessorHost.preWALRestore(region.getRegionInfo(), walEntry.getFirst(),
2217            walEntry.getSecond())) {
2218            // if bypass this log entry, ignore it ...
2219            continue;
2220          }
2221          walEntries.add(walEntry);
2222        }
2223        if(edits!=null && !edits.isEmpty()) {
2224          // HBASE-17924
2225          // sort to improve lock efficiency
2226          Collections.sort(edits, (v1, v2) -> Row.COMPARATOR.compare(v1.mutation, v2.mutation));
2227          long replaySeqId = (entry.getKey().hasOrigSequenceNumber()) ?
2228            entry.getKey().getOrigSequenceNumber() : entry.getKey().getLogSequenceNumber();
2229          OperationStatus[] result = doReplayBatchOp(region, edits, replaySeqId);
2230          // check if it's a partial success
2231          for (int i = 0; result != null && i < result.length; i++) {
2232            if (result[i] != OperationStatus.SUCCESS) {
2233              throw new IOException(result[i].getExceptionMsg());
2234            }
2235          }
2236        }
2237      }
2238
2239      //sync wal at the end because ASYNC_WAL is used above
2240      WAL wal = region.getWAL();
2241      if (wal != null) {
2242        wal.sync();
2243      }
2244
2245      if (coprocessorHost != null) {
2246        for (Pair<WALKey, WALEdit> entry : walEntries) {
2247          coprocessorHost.postWALRestore(region.getRegionInfo(), entry.getFirst(),
2248            entry.getSecond());
2249        }
2250      }
2251      return ReplicateWALEntryResponse.newBuilder().build();
2252    } catch (IOException ie) {
2253      throw new ServiceException(ie);
2254    } finally {
2255      final MetricsRegionServer metricsRegionServer = regionServer.getMetrics();
2256      if (metricsRegionServer != null) {
2257        metricsRegionServer.updateReplay(EnvironmentEdgeManager.currentTime() - before);
2258      }
2259    }
2260  }
2261
2262  /**
2263   * Replicate WAL entries on the region server.
2264   *
2265   * @param controller the RPC controller
2266   * @param request the request
2267   */
2268  @Override
2269  @QosPriority(priority=HConstants.REPLICATION_QOS)
2270  public ReplicateWALEntryResponse replicateWALEntry(final RpcController controller,
2271      final ReplicateWALEntryRequest request) throws ServiceException {
2272    try {
2273      checkOpen();
2274      if (regionServer.getReplicationSinkService() != null) {
2275        requestCount.increment();
2276        List<WALEntry> entries = request.getEntryList();
2277        CellScanner cellScanner = ((HBaseRpcController)controller).cellScanner();
2278        regionServer.getRegionServerCoprocessorHost().preReplicateLogEntries();
2279        regionServer.getReplicationSinkService().replicateLogEntries(entries, cellScanner,
2280          request.getReplicationClusterId(), request.getSourceBaseNamespaceDirPath(),
2281          request.getSourceHFileArchiveDirPath());
2282        regionServer.getRegionServerCoprocessorHost().postReplicateLogEntries();
2283        return ReplicateWALEntryResponse.newBuilder().build();
2284      } else {
2285        throw new ServiceException("Replication services are not initialized yet");
2286      }
2287    } catch (IOException ie) {
2288      throw new ServiceException(ie);
2289    }
2290  }
2291
2292  /**
2293   * Roll the WAL writer of the region server.
2294   * @param controller the RPC controller
2295   * @param request the request
2296   * @throws ServiceException
2297   */
2298  @Override
2299  public RollWALWriterResponse rollWALWriter(final RpcController controller,
2300      final RollWALWriterRequest request) throws ServiceException {
2301    try {
2302      checkOpen();
2303      requestCount.increment();
2304      regionServer.getRegionServerCoprocessorHost().preRollWALWriterRequest();
2305      regionServer.getWalRoller().requestRollAll();
2306      regionServer.getRegionServerCoprocessorHost().postRollWALWriterRequest();
2307      RollWALWriterResponse.Builder builder = RollWALWriterResponse.newBuilder();
2308      return builder.build();
2309    } catch (IOException ie) {
2310      throw new ServiceException(ie);
2311    }
2312  }
2313
2314
2315  /**
2316   * Stop the region server.
2317   *
2318   * @param controller the RPC controller
2319   * @param request the request
2320   * @throws ServiceException
2321   */
2322  @Override
2323  @QosPriority(priority=HConstants.ADMIN_QOS)
2324  public StopServerResponse stopServer(final RpcController controller,
2325      final StopServerRequest request) throws ServiceException {
2326    requestCount.increment();
2327    String reason = request.getReason();
2328    regionServer.stop(reason);
2329    return StopServerResponse.newBuilder().build();
2330  }
2331
2332  @Override
2333  public UpdateFavoredNodesResponse updateFavoredNodes(RpcController controller,
2334      UpdateFavoredNodesRequest request) throws ServiceException {
2335    List<UpdateFavoredNodesRequest.RegionUpdateInfo> openInfoList = request.getUpdateInfoList();
2336    UpdateFavoredNodesResponse.Builder respBuilder = UpdateFavoredNodesResponse.newBuilder();
2337    for (UpdateFavoredNodesRequest.RegionUpdateInfo regionUpdateInfo : openInfoList) {
2338      RegionInfo hri = ProtobufUtil.toRegionInfo(regionUpdateInfo.getRegion());
2339      if (regionUpdateInfo.getFavoredNodesCount() > 0) {
2340        regionServer.updateRegionFavoredNodesMapping(hri.getEncodedName(),
2341          regionUpdateInfo.getFavoredNodesList());
2342      }
2343    }
2344    respBuilder.setResponse(openInfoList.size());
2345    return respBuilder.build();
2346  }
2347
2348  /**
2349   * Atomically bulk load several HFiles into an open region
2350   * @return true if successful, false is failed but recoverably (no action)
2351   * @throws ServiceException if failed unrecoverably
2352   */
2353  @Override
2354  public BulkLoadHFileResponse bulkLoadHFile(final RpcController controller,
2355      final BulkLoadHFileRequest request) throws ServiceException {
2356    long start = EnvironmentEdgeManager.currentTime();
2357    List<String> clusterIds = new ArrayList<>(request.getClusterIdsList());
2358    if(clusterIds.contains(this.regionServer.clusterId)){
2359      return BulkLoadHFileResponse.newBuilder().setLoaded(true).build();
2360    } else {
2361      clusterIds.add(this.regionServer.clusterId);
2362    }
2363    try {
2364      checkOpen();
2365      requestCount.increment();
2366      HRegion region = getRegion(request.getRegion());
2367      Map<byte[], List<Path>> map = null;
2368      final boolean spaceQuotaEnabled = QuotaUtil.isQuotaEnabled(getConfiguration());
2369      long sizeToBeLoaded = -1;
2370
2371      // Check to see if this bulk load would exceed the space quota for this table
2372      if (spaceQuotaEnabled) {
2373        ActivePolicyEnforcement activeSpaceQuotas = getSpaceQuotaManager().getActiveEnforcements();
2374        SpaceViolationPolicyEnforcement enforcement = activeSpaceQuotas.getPolicyEnforcement(
2375            region);
2376        if (enforcement != null) {
2377          // Bulk loads must still be atomic. We must enact all or none.
2378          List<String> filePaths = new ArrayList<>(request.getFamilyPathCount());
2379          for (FamilyPath familyPath : request.getFamilyPathList()) {
2380            filePaths.add(familyPath.getPath());
2381          }
2382          // Check if the batch of files exceeds the current quota
2383          sizeToBeLoaded = enforcement.computeBulkLoadSize(regionServer.getFileSystem(), filePaths);
2384        }
2385      }
2386
2387      List<Pair<byte[], String>> familyPaths = new ArrayList<>(request.getFamilyPathCount());
2388      for (FamilyPath familyPath : request.getFamilyPathList()) {
2389        familyPaths.add(new Pair<>(familyPath.getFamily().toByteArray(), familyPath.getPath()));
2390      }
2391      if (!request.hasBulkToken()) {
2392        if (region.getCoprocessorHost() != null) {
2393          region.getCoprocessorHost().preBulkLoadHFile(familyPaths);
2394        }
2395        try {
2396          map = region.bulkLoadHFiles(familyPaths, request.getAssignSeqNum(), null,
2397              request.getCopyFile(), clusterIds, request.getReplicate());
2398        } finally {
2399          if (region.getCoprocessorHost() != null) {
2400            region.getCoprocessorHost().postBulkLoadHFile(familyPaths, map);
2401          }
2402        }
2403      } else {
2404        // secure bulk load
2405        map = regionServer.getSecureBulkLoadManager().secureBulkLoadHFiles(region, request, clusterIds);
2406      }
2407      BulkLoadHFileResponse.Builder builder = BulkLoadHFileResponse.newBuilder();
2408      builder.setLoaded(map != null);
2409      if (map != null) {
2410        // Treat any negative size as a flag to "ignore" updating the region size as that is
2411        // not possible to occur in real life (cannot bulk load a file with negative size)
2412        if (spaceQuotaEnabled && sizeToBeLoaded > 0) {
2413          if (LOG.isTraceEnabled()) {
2414            LOG.trace("Incrementing space use of " + region.getRegionInfo() + " by "
2415                + sizeToBeLoaded + " bytes");
2416          }
2417          // Inform space quotas of the new files for this region
2418          getSpaceQuotaManager().getRegionSizeStore().incrementRegionSize(
2419              region.getRegionInfo(), sizeToBeLoaded);
2420        }
2421      }
2422      return builder.build();
2423    } catch (IOException ie) {
2424      throw new ServiceException(ie);
2425    } finally {
2426      final MetricsRegionServer metricsRegionServer = regionServer.getMetrics();
2427      if (metricsRegionServer != null) {
2428        metricsRegionServer.updateBulkLoad(EnvironmentEdgeManager.currentTime() - start);
2429      }
2430    }
2431  }
2432
2433  @Override
2434  public PrepareBulkLoadResponse prepareBulkLoad(RpcController controller,
2435      PrepareBulkLoadRequest request) throws ServiceException {
2436    try {
2437      checkOpen();
2438      requestCount.increment();
2439
2440      HRegion region = getRegion(request.getRegion());
2441
2442      String bulkToken = regionServer.getSecureBulkLoadManager().prepareBulkLoad(region, request);
2443      PrepareBulkLoadResponse.Builder builder = PrepareBulkLoadResponse.newBuilder();
2444      builder.setBulkToken(bulkToken);
2445      return builder.build();
2446    } catch (IOException ie) {
2447      throw new ServiceException(ie);
2448    }
2449  }
2450
2451  @Override
2452  public CleanupBulkLoadResponse cleanupBulkLoad(RpcController controller,
2453      CleanupBulkLoadRequest request) throws ServiceException {
2454    try {
2455      checkOpen();
2456      requestCount.increment();
2457
2458      HRegion region = getRegion(request.getRegion());
2459
2460      regionServer.getSecureBulkLoadManager().cleanupBulkLoad(region, request);
2461      return CleanupBulkLoadResponse.newBuilder().build();
2462    } catch (IOException ie) {
2463      throw new ServiceException(ie);
2464    }
2465  }
2466
2467  @Override
2468  public CoprocessorServiceResponse execService(final RpcController controller,
2469      final CoprocessorServiceRequest request) throws ServiceException {
2470    try {
2471      checkOpen();
2472      requestCount.increment();
2473      HRegion region = getRegion(request.getRegion());
2474      com.google.protobuf.Message result = execServiceOnRegion(region, request.getCall());
2475      CoprocessorServiceResponse.Builder builder = CoprocessorServiceResponse.newBuilder();
2476      builder.setRegion(RequestConverter.buildRegionSpecifier(
2477        RegionSpecifierType.REGION_NAME, region.getRegionInfo().getRegionName()));
2478      // TODO: COPIES!!!!!!
2479      builder.setValue(builder.getValueBuilder().setName(result.getClass().getName()).
2480        setValue(org.apache.hbase.thirdparty.com.google.protobuf.ByteString.
2481            copyFrom(result.toByteArray())));
2482      return builder.build();
2483    } catch (IOException ie) {
2484      throw new ServiceException(ie);
2485    }
2486  }
2487
2488  private com.google.protobuf.Message execServiceOnRegion(HRegion region,
2489      final ClientProtos.CoprocessorServiceCall serviceCall) throws IOException {
2490    // ignore the passed in controller (from the serialized call)
2491    ServerRpcController execController = new ServerRpcController();
2492    return region.execService(execController, serviceCall);
2493  }
2494
2495  /**
2496   * Get data from a table.
2497   *
2498   * @param controller the RPC controller
2499   * @param request the get request
2500   * @throws ServiceException
2501   */
2502  @Override
2503  public GetResponse get(final RpcController controller,
2504      final GetRequest request) throws ServiceException {
2505    long before = EnvironmentEdgeManager.currentTime();
2506    OperationQuota quota = null;
2507    HRegion region = null;
2508    try {
2509      checkOpen();
2510      requestCount.increment();
2511      rpcGetRequestCount.increment();
2512      region = getRegion(request.getRegion());
2513
2514      GetResponse.Builder builder = GetResponse.newBuilder();
2515      ClientProtos.Get get = request.getGet();
2516      // An asynchbase client, https://github.com/OpenTSDB/asynchbase, starts by trying to do
2517      // a get closest before. Throwing the UnknownProtocolException signals it that it needs
2518      // to switch and do hbase2 protocol (HBase servers do not tell clients what versions
2519      // they are; its a problem for non-native clients like asynchbase. HBASE-20225.
2520      if (get.hasClosestRowBefore() && get.getClosestRowBefore()) {
2521        throw new UnknownProtocolException("Is this a pre-hbase-1.0.0 or asynchbase client? " +
2522            "Client is invoking getClosestRowBefore removed in hbase-2.0.0 replaced by " +
2523            "reverse Scan.");
2524      }
2525      Boolean existence = null;
2526      Result r = null;
2527      RpcCallContext context = RpcServer.getCurrentCall().orElse(null);
2528      quota = getRpcQuotaManager().checkQuota(region, OperationQuota.OperationType.GET);
2529
2530      Get clientGet = ProtobufUtil.toGet(get);
2531      if (get.getExistenceOnly() && region.getCoprocessorHost() != null) {
2532        existence = region.getCoprocessorHost().preExists(clientGet);
2533      }
2534      if (existence == null) {
2535        if (context != null) {
2536          r = get(clientGet, (region), null, context);
2537        } else {
2538          // for test purpose
2539          r = region.get(clientGet);
2540        }
2541        if (get.getExistenceOnly()) {
2542          boolean exists = r.getExists();
2543          if (region.getCoprocessorHost() != null) {
2544            exists = region.getCoprocessorHost().postExists(clientGet, exists);
2545          }
2546          existence = exists;
2547        }
2548      }
2549      if (existence != null) {
2550        ClientProtos.Result pbr =
2551            ProtobufUtil.toResult(existence, region.getRegionInfo().getReplicaId() != 0);
2552        builder.setResult(pbr);
2553      } else if (r != null) {
2554        ClientProtos.Result pbr;
2555        if (isClientCellBlockSupport(context) && controller instanceof HBaseRpcController
2556            && VersionInfoUtil.hasMinimumVersion(context.getClientVersionInfo(), 1, 3)) {
2557          pbr = ProtobufUtil.toResultNoData(r);
2558          ((HBaseRpcController) controller).setCellScanner(CellUtil.createCellScanner(r
2559              .rawCells()));
2560          addSize(context, r, null);
2561        } else {
2562          pbr = ProtobufUtil.toResult(r);
2563        }
2564        builder.setResult(pbr);
2565      }
2566      //r.cells is null when an table.exists(get) call
2567      if (r != null && r.rawCells() != null) {
2568        quota.addGetResult(r);
2569      }
2570      return builder.build();
2571    } catch (IOException ie) {
2572      throw new ServiceException(ie);
2573    } finally {
2574      final MetricsRegionServer metricsRegionServer = regionServer.getMetrics();
2575      if (metricsRegionServer != null) {
2576        TableDescriptor td = region != null? region.getTableDescriptor(): null;
2577        if (td != null) {
2578          metricsRegionServer.updateGet(
2579              td.getTableName(), EnvironmentEdgeManager.currentTime() - before);
2580        }
2581      }
2582      if (quota != null) {
2583        quota.close();
2584      }
2585    }
2586  }
2587
2588  private Result get(Get get, HRegion region, RegionScannersCloseCallBack closeCallBack,
2589      RpcCallContext context) throws IOException {
2590    region.prepareGet(get);
2591    boolean stale = region.getRegionInfo().getReplicaId() != 0;
2592
2593    // This method is almost the same as HRegion#get.
2594    List<Cell> results = new ArrayList<>();
2595    long before = EnvironmentEdgeManager.currentTime();
2596    // pre-get CP hook
2597    if (region.getCoprocessorHost() != null) {
2598      if (region.getCoprocessorHost().preGet(get, results)) {
2599        region.metricsUpdateForGet(results, before);
2600        return Result
2601            .create(results, get.isCheckExistenceOnly() ? !results.isEmpty() : null, stale);
2602      }
2603    }
2604    Scan scan = new Scan(get);
2605    if (scan.getLoadColumnFamiliesOnDemandValue() == null) {
2606      scan.setLoadColumnFamiliesOnDemand(region.isLoadingCfsOnDemandDefault());
2607    }
2608    RegionScannerImpl scanner = null;
2609    try {
2610      scanner = region.getScanner(scan);
2611      scanner.next(results);
2612    } finally {
2613      if (scanner != null) {
2614        if (closeCallBack == null) {
2615          // If there is a context then the scanner can be added to the current
2616          // RpcCallContext. The rpc callback will take care of closing the
2617          // scanner, for eg in case
2618          // of get()
2619          context.setCallBack(scanner);
2620        } else {
2621          // The call is from multi() where the results from the get() are
2622          // aggregated and then send out to the
2623          // rpc. The rpccall back will close all such scanners created as part
2624          // of multi().
2625          closeCallBack.addScanner(scanner);
2626        }
2627      }
2628    }
2629
2630    // post-get CP hook
2631    if (region.getCoprocessorHost() != null) {
2632      region.getCoprocessorHost().postGet(get, results);
2633    }
2634    region.metricsUpdateForGet(results, before);
2635
2636    return Result.create(results, get.isCheckExistenceOnly() ? !results.isEmpty() : null, stale);
2637  }
2638
2639  private void checkBatchSizeAndLogLargeSize(MultiRequest request) throws ServiceException {
2640    int sum = 0;
2641    String firstRegionName = null;
2642    for (RegionAction regionAction : request.getRegionActionList()) {
2643      if (sum == 0) {
2644        firstRegionName = Bytes.toStringBinary(regionAction.getRegion().getValue().toByteArray());
2645      }
2646      sum += regionAction.getActionCount();
2647    }
2648    if (sum > rowSizeWarnThreshold) {
2649      LOG.warn("Large batch operation detected (greater than " + rowSizeWarnThreshold +
2650        ") (HBASE-18023)." + " Requested Number of Rows: " + sum + " Client: " +
2651        RpcServer.getRequestUserName().orElse(null) + "/" +
2652        RpcServer.getRemoteAddress().orElse(null) + " first region in multi=" + firstRegionName);
2653      if (rejectRowsWithSizeOverThreshold) {
2654        throw new ServiceException(
2655          "Rejecting large batch operation for current batch with firstRegionName: " +
2656            firstRegionName + " , Requested Number of Rows: " + sum + " , Size Threshold: " +
2657            rowSizeWarnThreshold);
2658      }
2659    }
2660  }
2661
2662  private void failRegionAction(MultiResponse.Builder responseBuilder,
2663    RegionActionResult.Builder regionActionResultBuilder, RegionAction regionAction,
2664    CellScanner cellScanner, Throwable error) {
2665    rpcServer.getMetrics().exception(error);
2666    regionActionResultBuilder.setException(ResponseConverter.buildException(error));
2667    responseBuilder.addRegionActionResult(regionActionResultBuilder.build());
2668    // All Mutations in this RegionAction not executed as we can not see the Region online here
2669    // in this RS. Will be retried from Client. Skipping all the Cells in CellScanner
2670    // corresponding to these Mutations.
2671    if (cellScanner != null) {
2672      skipCellsForMutations(regionAction.getActionList(), cellScanner);
2673    }
2674  }
2675
2676  /**
2677   * Execute multiple actions on a table: get, mutate, and/or execCoprocessor
2678   *
2679   * @param rpcc the RPC controller
2680   * @param request the multi request
2681   * @throws ServiceException
2682   */
2683  @Override
2684  public MultiResponse multi(final RpcController rpcc, final MultiRequest request)
2685  throws ServiceException {
2686    try {
2687      checkOpen();
2688    } catch (IOException ie) {
2689      throw new ServiceException(ie);
2690    }
2691
2692    checkBatchSizeAndLogLargeSize(request);
2693
2694    // rpc controller is how we bring in data via the back door;  it is unprotobuf'ed data.
2695    // It is also the conduit via which we pass back data.
2696    HBaseRpcController controller = (HBaseRpcController)rpcc;
2697    CellScanner cellScanner = controller != null ? controller.cellScanner(): null;
2698    if (controller != null) {
2699      controller.setCellScanner(null);
2700    }
2701
2702    long nonceGroup = request.hasNonceGroup() ? request.getNonceGroup() : HConstants.NO_NONCE;
2703
2704    MultiResponse.Builder responseBuilder = MultiResponse.newBuilder();
2705    RegionActionResult.Builder regionActionResultBuilder = RegionActionResult.newBuilder();
2706    this.rpcMultiRequestCount.increment();
2707    this.requestCount.increment();
2708    ActivePolicyEnforcement spaceQuotaEnforcement = getSpaceQuotaManager().getActiveEnforcements();
2709
2710    // We no longer use MultiRequest#condition. Instead, we use RegionAction#condition. The
2711    // following logic is for backward compatibility as old clients still use
2712    // MultiRequest#condition in case of checkAndMutate with RowMutations.
2713    if (request.hasCondition()) {
2714      if (request.getRegionActionList().isEmpty()) {
2715        // If the region action list is empty, do nothing.
2716        responseBuilder.setProcessed(true);
2717        return responseBuilder.build();
2718      }
2719
2720      RegionAction regionAction = request.getRegionAction(0);
2721
2722      // When request.hasCondition() is true, regionAction.getAtomic() should be always true. So
2723      // we can assume regionAction.getAtomic() is true here.
2724      assert regionAction.getAtomic();
2725
2726      OperationQuota quota;
2727      HRegion region;
2728      RegionSpecifier regionSpecifier = regionAction.getRegion();
2729
2730      try {
2731        region = getRegion(regionSpecifier);
2732        quota = getRpcQuotaManager().checkQuota(region, regionAction.getActionList());
2733      } catch (IOException e) {
2734        failRegionAction(responseBuilder, regionActionResultBuilder, regionAction, cellScanner, e);
2735        return responseBuilder.build();
2736      }
2737
2738      try {
2739        CheckAndMutateResult result = checkAndMutate(region, regionAction.getActionList(),
2740          cellScanner, request.getCondition(), spaceQuotaEnforcement);
2741        responseBuilder.setProcessed(result.isSuccess());
2742        ClientProtos.ResultOrException.Builder resultOrExceptionOrBuilder =
2743          ClientProtos.ResultOrException.newBuilder();
2744        for (int i = 0; i < regionAction.getActionCount(); i++) {
2745          // To unify the response format with doNonAtomicRegionMutation and read through
2746          // client's AsyncProcess we have to add an empty result instance per operation
2747          resultOrExceptionOrBuilder.clear();
2748          resultOrExceptionOrBuilder.setIndex(i);
2749          regionActionResultBuilder.addResultOrException(resultOrExceptionOrBuilder.build());
2750        }
2751      } catch (IOException e) {
2752        rpcServer.getMetrics().exception(e);
2753        // As it's an atomic operation with a condition, we may expect it's a global failure.
2754        regionActionResultBuilder.setException(ResponseConverter.buildException(e));
2755      } finally {
2756        quota.close();
2757      }
2758
2759      responseBuilder.addRegionActionResult(regionActionResultBuilder.build());
2760      ClientProtos.RegionLoadStats regionLoadStats = region.getLoadStatistics();
2761      if (regionLoadStats != null) {
2762        responseBuilder.setRegionStatistics(MultiRegionLoadStats.newBuilder()
2763          .addRegion(regionSpecifier).addStat(regionLoadStats).build());
2764      }
2765      return responseBuilder.build();
2766    }
2767
2768    // this will contain all the cells that we need to return. It's created later, if needed.
2769    List<CellScannable> cellsToReturn = null;
2770    RegionScannersCloseCallBack closeCallBack = null;
2771    RpcCallContext context = RpcServer.getCurrentCall().orElse(null);
2772    Map<RegionSpecifier, ClientProtos.RegionLoadStats> regionStats = new HashMap<>(request
2773      .getRegionActionCount());
2774
2775    for (RegionAction regionAction : request.getRegionActionList()) {
2776      OperationQuota quota;
2777      HRegion region;
2778      RegionSpecifier regionSpecifier = regionAction.getRegion();
2779      regionActionResultBuilder.clear();
2780
2781      try {
2782        region = getRegion(regionSpecifier);
2783        quota = getRpcQuotaManager().checkQuota(region, regionAction.getActionList());
2784      } catch (IOException e) {
2785        failRegionAction(responseBuilder, regionActionResultBuilder, regionAction, cellScanner, e);
2786        continue;  // For this region it's a failure.
2787      }
2788
2789      try {
2790        if (regionAction.hasCondition()) {
2791          try {
2792            ClientProtos.ResultOrException.Builder resultOrExceptionOrBuilder =
2793              ClientProtos.ResultOrException.newBuilder();
2794            if (regionAction.getActionCount() == 1) {
2795              CheckAndMutateResult result = checkAndMutate(region, quota,
2796                regionAction.getAction(0).getMutation(), cellScanner,
2797                regionAction.getCondition(), spaceQuotaEnforcement);
2798              regionActionResultBuilder.setProcessed(result.isSuccess());
2799              resultOrExceptionOrBuilder.setIndex(0);
2800              if (result.getResult() != null) {
2801                resultOrExceptionOrBuilder.setResult(ProtobufUtil.toResult(result.getResult()));
2802              }
2803              regionActionResultBuilder.addResultOrException(resultOrExceptionOrBuilder.build());
2804            } else {
2805              CheckAndMutateResult result = checkAndMutate(region, regionAction.getActionList(),
2806                cellScanner, regionAction.getCondition(), spaceQuotaEnforcement);
2807              regionActionResultBuilder.setProcessed(result.isSuccess());
2808              for (int i = 0; i < regionAction.getActionCount(); i++) {
2809                if (i == 0 && result.getResult() != null) {
2810                  // Set the result of the Increment/Append operations to the first element of the
2811                  // ResultOrException list
2812                  resultOrExceptionOrBuilder.setIndex(i);
2813                  regionActionResultBuilder.addResultOrException(resultOrExceptionOrBuilder
2814                    .setResult(ProtobufUtil.toResult(result.getResult())).build());
2815                  continue;
2816                }
2817                // To unify the response format with doNonAtomicRegionMutation and read through
2818                // client's AsyncProcess we have to add an empty result instance per operation
2819                resultOrExceptionOrBuilder.clear();
2820                resultOrExceptionOrBuilder.setIndex(i);
2821                regionActionResultBuilder.addResultOrException(resultOrExceptionOrBuilder.build());
2822              }
2823            }
2824          } catch (IOException e) {
2825            rpcServer.getMetrics().exception(e);
2826            // As it's an atomic operation with a condition, we may expect it's a global failure.
2827            regionActionResultBuilder.setException(ResponseConverter.buildException(e));
2828          }
2829        } else if (regionAction.hasAtomic() && regionAction.getAtomic()) {
2830          try {
2831            doAtomicBatchOp(regionActionResultBuilder, region, quota, regionAction.getActionList(),
2832              cellScanner, spaceQuotaEnforcement);
2833            regionActionResultBuilder.setProcessed(true);
2834            // We no longer use MultiResponse#processed. Instead, we use
2835            // RegionActionResult#processed. This is for backward compatibility for old clients.
2836            responseBuilder.setProcessed(true);
2837          } catch (IOException e) {
2838            rpcServer.getMetrics().exception(e);
2839            // As it's atomic, we may expect it's a global failure.
2840            regionActionResultBuilder.setException(ResponseConverter.buildException(e));
2841          }
2842        } else {
2843          // doNonAtomicRegionMutation manages the exception internally
2844          if (context != null && closeCallBack == null) {
2845            // An RpcCallBack that creates a list of scanners that needs to perform callBack
2846            // operation on completion of multiGets.
2847            // Set this only once
2848            closeCallBack = new RegionScannersCloseCallBack();
2849            context.setCallBack(closeCallBack);
2850          }
2851          cellsToReturn = doNonAtomicRegionMutation(region, quota, regionAction, cellScanner,
2852            regionActionResultBuilder, cellsToReturn, nonceGroup, closeCallBack, context,
2853            spaceQuotaEnforcement);
2854        }
2855      } finally {
2856        quota.close();
2857      }
2858
2859      responseBuilder.addRegionActionResult(regionActionResultBuilder.build());
2860      ClientProtos.RegionLoadStats regionLoadStats = region.getLoadStatistics();
2861      if(regionLoadStats != null) {
2862        regionStats.put(regionSpecifier, regionLoadStats);
2863      }
2864    }
2865    // Load the controller with the Cells to return.
2866    if (cellsToReturn != null && !cellsToReturn.isEmpty() && controller != null) {
2867      controller.setCellScanner(CellUtil.createCellScanner(cellsToReturn));
2868    }
2869
2870    MultiRegionLoadStats.Builder builder = MultiRegionLoadStats.newBuilder();
2871    for(Entry<RegionSpecifier, ClientProtos.RegionLoadStats> stat: regionStats.entrySet()){
2872      builder.addRegion(stat.getKey());
2873      builder.addStat(stat.getValue());
2874    }
2875    responseBuilder.setRegionStatistics(builder);
2876    return responseBuilder.build();
2877  }
2878
2879  private void skipCellsForMutations(List<Action> actions, CellScanner cellScanner) {
2880    if (cellScanner == null) {
2881      return;
2882    }
2883    for (Action action : actions) {
2884      skipCellsForMutation(action, cellScanner);
2885    }
2886  }
2887
2888  private void skipCellsForMutation(Action action, CellScanner cellScanner) {
2889    if (cellScanner == null) {
2890      return;
2891    }
2892    try {
2893      if (action.hasMutation()) {
2894        MutationProto m = action.getMutation();
2895        if (m.hasAssociatedCellCount()) {
2896          for (int i = 0; i < m.getAssociatedCellCount(); i++) {
2897            cellScanner.advance();
2898          }
2899        }
2900      }
2901    } catch (IOException e) {
2902      // No need to handle these Individual Muatation level issue. Any way this entire RegionAction
2903      // marked as failed as we could not see the Region here. At client side the top level
2904      // RegionAction exception will be considered first.
2905      LOG.error("Error while skipping Cells in CellScanner for invalid Region Mutations", e);
2906    }
2907  }
2908
2909  /**
2910   * Mutate data in a table.
2911   *
2912   * @param rpcc the RPC controller
2913   * @param request the mutate request
2914   */
2915  @Override
2916  public MutateResponse mutate(final RpcController rpcc,
2917      final MutateRequest request) throws ServiceException {
2918    // rpc controller is how we bring in data via the back door;  it is unprotobuf'ed data.
2919    // It is also the conduit via which we pass back data.
2920    HBaseRpcController controller = (HBaseRpcController)rpcc;
2921    CellScanner cellScanner = controller != null ? controller.cellScanner() : null;
2922    OperationQuota quota = null;
2923    RpcCallContext context = RpcServer.getCurrentCall().orElse(null);
2924    // Clear scanner so we are not holding on to reference across call.
2925    if (controller != null) {
2926      controller.setCellScanner(null);
2927    }
2928    try {
2929      checkOpen();
2930      requestCount.increment();
2931      rpcMutateRequestCount.increment();
2932      HRegion region = getRegion(request.getRegion());
2933      MutateResponse.Builder builder = MutateResponse.newBuilder();
2934      MutationProto mutation = request.getMutation();
2935      if (!region.getRegionInfo().isMetaRegion()) {
2936        regionServer.getMemStoreFlusher().reclaimMemStoreMemory();
2937      }
2938      long nonceGroup = request.hasNonceGroup() ? request.getNonceGroup() : HConstants.NO_NONCE;
2939      quota = getRpcQuotaManager().checkQuota(region, OperationQuota.OperationType.MUTATE);
2940      ActivePolicyEnforcement spaceQuotaEnforcement = getSpaceQuotaManager()
2941        .getActiveEnforcements();
2942
2943      if (request.hasCondition()) {
2944        CheckAndMutateResult result = checkAndMutate(region, quota, mutation, cellScanner,
2945          request.getCondition(), spaceQuotaEnforcement);
2946        builder.setProcessed(result.isSuccess());
2947        boolean clientCellBlockSupported = isClientCellBlockSupport(context);
2948        addResult(builder, result.getResult(), controller, clientCellBlockSupported);
2949        if (clientCellBlockSupported) {
2950          addSize(context, result.getResult(), null);
2951        }
2952      } else {
2953        Result r = null;
2954        Boolean processed = null;
2955        MutationType type = mutation.getMutateType();
2956        switch (type) {
2957          case APPEND:
2958            // TODO: this doesn't actually check anything.
2959            r = append(region, quota, mutation, cellScanner, nonceGroup, spaceQuotaEnforcement);
2960            break;
2961          case INCREMENT:
2962            // TODO: this doesn't actually check anything.
2963            r = increment(region, quota, mutation, cellScanner, nonceGroup, spaceQuotaEnforcement);
2964            break;
2965          case PUT:
2966            put(region, quota, mutation, cellScanner, spaceQuotaEnforcement);
2967            processed = Boolean.TRUE;
2968            break;
2969          case DELETE:
2970            delete(region, quota, mutation, cellScanner, spaceQuotaEnforcement);
2971            processed = Boolean.TRUE;
2972            break;
2973          default:
2974            throw new DoNotRetryIOException("Unsupported mutate type: " + type.name());
2975        }
2976        if (processed != null) {
2977          builder.setProcessed(processed);
2978        }
2979        boolean clientCellBlockSupported = isClientCellBlockSupport(context);
2980        addResult(builder, r, controller, clientCellBlockSupported);
2981        if (clientCellBlockSupported) {
2982          addSize(context, r, null);
2983        }
2984      }
2985      return builder.build();
2986    } catch (IOException ie) {
2987      regionServer.checkFileSystem();
2988      throw new ServiceException(ie);
2989    } finally {
2990      if (quota != null) {
2991        quota.close();
2992      }
2993    }
2994  }
2995
2996  private void put(HRegion region, OperationQuota quota, MutationProto mutation,
2997    CellScanner cellScanner, ActivePolicyEnforcement spaceQuota) throws IOException {
2998    long before = EnvironmentEdgeManager.currentTime();
2999    Put put = ProtobufUtil.toPut(mutation, cellScanner);
3000    checkCellSizeLimit(region, put);
3001    spaceQuota.getPolicyEnforcement(region).check(put);
3002    quota.addMutation(put);
3003    region.put(put);
3004
3005    MetricsRegionServer metricsRegionServer = regionServer.getMetrics();
3006    if (metricsRegionServer != null) {
3007      long after = EnvironmentEdgeManager.currentTime();
3008      metricsRegionServer.updatePut(region.getRegionInfo().getTable(), after - before);
3009    }
3010  }
3011
3012  private void delete(HRegion region, OperationQuota quota, MutationProto mutation,
3013    CellScanner cellScanner, ActivePolicyEnforcement spaceQuota) throws IOException {
3014    long before = EnvironmentEdgeManager.currentTime();
3015    Delete delete = ProtobufUtil.toDelete(mutation, cellScanner);
3016    checkCellSizeLimit(region, delete);
3017    spaceQuota.getPolicyEnforcement(region).check(delete);
3018    quota.addMutation(delete);
3019    region.delete(delete);
3020
3021    MetricsRegionServer metricsRegionServer = regionServer.getMetrics();
3022    if (metricsRegionServer != null) {
3023      long after = EnvironmentEdgeManager.currentTime();
3024      metricsRegionServer.updateDelete(region.getRegionInfo().getTable(), after - before);
3025    }
3026  }
3027
3028  private CheckAndMutateResult checkAndMutate(HRegion region, OperationQuota quota,
3029    MutationProto mutation, CellScanner cellScanner, Condition condition,
3030    ActivePolicyEnforcement spaceQuota) throws IOException {
3031    long before = EnvironmentEdgeManager.currentTime();
3032    CheckAndMutate checkAndMutate = ProtobufUtil.toCheckAndMutate(condition, mutation,
3033      cellScanner);
3034    checkCellSizeLimit(region, (Mutation) checkAndMutate.getAction());
3035    spaceQuota.getPolicyEnforcement(region).check((Mutation) checkAndMutate.getAction());
3036    quota.addMutation((Mutation) checkAndMutate.getAction());
3037
3038    CheckAndMutateResult result = null;
3039    if (region.getCoprocessorHost() != null) {
3040      result = region.getCoprocessorHost().preCheckAndMutate(checkAndMutate);
3041    }
3042    if (result == null) {
3043      result = region.checkAndMutate(checkAndMutate);
3044      if (region.getCoprocessorHost() != null) {
3045        result = region.getCoprocessorHost().postCheckAndMutate(checkAndMutate, result);
3046      }
3047    }
3048    MetricsRegionServer metricsRegionServer = regionServer.getMetrics();
3049    if (metricsRegionServer != null) {
3050      long after = EnvironmentEdgeManager.currentTime();
3051      metricsRegionServer.updateCheckAndMutate(
3052        region.getRegionInfo().getTable(), after - before);
3053
3054      MutationType type = mutation.getMutateType();
3055      switch (type) {
3056        case PUT:
3057          metricsRegionServer.updateCheckAndPut(
3058            region.getRegionInfo().getTable(), after - before);
3059          break;
3060        case DELETE:
3061          metricsRegionServer.updateCheckAndDelete(
3062            region.getRegionInfo().getTable(), after - before);
3063          break;
3064        default:
3065          break;
3066      }
3067    }
3068    return result;
3069  }
3070
3071  // This is used to keep compatible with the old client implementation. Consider remove it if we
3072  // decide to drop the support of the client that still sends close request to a region scanner
3073  // which has already been exhausted.
3074  @Deprecated
3075  private static final IOException SCANNER_ALREADY_CLOSED = new IOException() {
3076
3077    private static final long serialVersionUID = -4305297078988180130L;
3078
3079    @Override
3080    public synchronized Throwable fillInStackTrace() {
3081      return this;
3082    }
3083  };
3084
3085  private RegionScannerHolder getRegionScanner(ScanRequest request) throws IOException {
3086    String scannerName = Long.toString(request.getScannerId());
3087    RegionScannerHolder rsh = scanners.get(scannerName);
3088    if (rsh == null) {
3089      // just ignore the next or close request if scanner does not exists.
3090      if (closedScanners.getIfPresent(scannerName) != null) {
3091        throw SCANNER_ALREADY_CLOSED;
3092      } else {
3093        LOG.warn("Client tried to access missing scanner " + scannerName);
3094        throw new UnknownScannerException(
3095            "Unknown scanner '" + scannerName + "'. This can happen due to any of the following " +
3096                "reasons: a) Scanner id given is wrong, b) Scanner lease expired because of " +
3097                "long wait between consecutive client checkins, c) Server may be closing down, " +
3098                "d) RegionServer restart during upgrade.\nIf the issue is due to reason (b), a " +
3099                "possible fix would be increasing the value of" +
3100                "'hbase.client.scanner.timeout.period' configuration.");
3101      }
3102    }
3103    RegionInfo hri = rsh.s.getRegionInfo();
3104    // Yes, should be the same instance
3105    if (regionServer.getOnlineRegion(hri.getRegionName()) != rsh.r) {
3106      String msg = "Region has changed on the scanner " + scannerName + ": regionName="
3107          + hri.getRegionNameAsString() + ", scannerRegionName=" + rsh.r;
3108      LOG.warn(msg + ", closing...");
3109      scanners.remove(scannerName);
3110      try {
3111        rsh.s.close();
3112      } catch (IOException e) {
3113        LOG.warn("Getting exception closing " + scannerName, e);
3114      } finally {
3115        try {
3116          regionServer.getLeaseManager().cancelLease(scannerName);
3117        } catch (LeaseException e) {
3118          LOG.warn("Getting exception closing " + scannerName, e);
3119        }
3120      }
3121      throw new NotServingRegionException(msg);
3122    }
3123    return rsh;
3124  }
3125
3126  private RegionScannerHolder newRegionScanner(ScanRequest request, ScanResponse.Builder builder)
3127      throws IOException {
3128    HRegion region = getRegion(request.getRegion());
3129    ClientProtos.Scan protoScan = request.getScan();
3130    boolean isLoadingCfsOnDemandSet = protoScan.hasLoadColumnFamiliesOnDemand();
3131    Scan scan = ProtobufUtil.toScan(protoScan);
3132    // if the request doesn't set this, get the default region setting.
3133    if (!isLoadingCfsOnDemandSet) {
3134      scan.setLoadColumnFamiliesOnDemand(region.isLoadingCfsOnDemandDefault());
3135    }
3136
3137    if (!scan.hasFamilies()) {
3138      // Adding all families to scanner
3139      for (byte[] family : region.getTableDescriptor().getColumnFamilyNames()) {
3140        scan.addFamily(family);
3141      }
3142    }
3143    if (region.getCoprocessorHost() != null) {
3144      // preScannerOpen is not allowed to return a RegionScanner. Only post hook can create a
3145      // wrapper for the core created RegionScanner
3146      region.getCoprocessorHost().preScannerOpen(scan);
3147    }
3148    RegionScannerImpl coreScanner = region.getScanner(scan);
3149    Shipper shipper = coreScanner;
3150    RegionScanner scanner = coreScanner;
3151    if (region.getCoprocessorHost() != null) {
3152      scanner = region.getCoprocessorHost().postScannerOpen(scan, scanner);
3153    }
3154    long scannerId = scannerIdGenerator.generateNewScannerId();
3155    builder.setScannerId(scannerId);
3156    builder.setMvccReadPoint(scanner.getMvccReadPoint());
3157    builder.setTtl(scannerLeaseTimeoutPeriod);
3158    String scannerName = String.valueOf(scannerId);
3159
3160    boolean fullRegionScan = !region.getRegionInfo().getTable().isSystemTable() &&
3161      isFullRegionScan(scan, region);
3162
3163    return addScanner(scannerName, scanner, shipper, region, scan.isNeedCursorResult(),
3164      fullRegionScan);
3165  }
3166
3167  private void checkScanNextCallSeq(ScanRequest request, RegionScannerHolder rsh)
3168      throws OutOfOrderScannerNextException {
3169    // if nextCallSeq does not match throw Exception straight away. This needs to be
3170    // performed even before checking of Lease.
3171    // See HBASE-5974
3172    if (request.hasNextCallSeq()) {
3173      long callSeq = request.getNextCallSeq();
3174      if (!rsh.incNextCallSeq(callSeq)) {
3175        throw new OutOfOrderScannerNextException("Expected nextCallSeq: " + rsh.getNextCallSeq()
3176            + " But the nextCallSeq got from client: " + request.getNextCallSeq() + "; request="
3177            + TextFormat.shortDebugString(request));
3178      }
3179    }
3180  }
3181
3182  private void addScannerLeaseBack(LeaseManager.Lease lease) {
3183    try {
3184      regionServer.getLeaseManager().addLease(lease);
3185    } catch (LeaseStillHeldException e) {
3186      // should not happen as the scanner id is unique.
3187      throw new AssertionError(e);
3188    }
3189  }
3190
3191  private long getTimeLimit(HBaseRpcController controller, boolean allowHeartbeatMessages) {
3192    // Set the time limit to be half of the more restrictive timeout value (one of the
3193    // timeout values must be positive). In the event that both values are positive, the
3194    // more restrictive of the two is used to calculate the limit.
3195    if (allowHeartbeatMessages && (scannerLeaseTimeoutPeriod > 0 || rpcTimeout > 0)) {
3196      long timeLimitDelta;
3197      if (scannerLeaseTimeoutPeriod > 0 && rpcTimeout > 0) {
3198        timeLimitDelta = Math.min(scannerLeaseTimeoutPeriod, rpcTimeout);
3199      } else {
3200        timeLimitDelta = scannerLeaseTimeoutPeriod > 0 ? scannerLeaseTimeoutPeriod : rpcTimeout;
3201      }
3202      if (controller != null && controller.getCallTimeout() > 0) {
3203        timeLimitDelta = Math.min(timeLimitDelta, controller.getCallTimeout());
3204      }
3205      // Use half of whichever timeout value was more restrictive... But don't allow
3206      // the time limit to be less than the allowable minimum (could cause an
3207      // immediatate timeout before scanning any data).
3208      timeLimitDelta = Math.max(timeLimitDelta / 2, minimumScanTimeLimitDelta);
3209      // XXX: Can not use EnvironmentEdge here because TestIncrementTimeRange use a
3210      // ManualEnvironmentEdge. Consider using System.nanoTime instead.
3211      return System.currentTimeMillis() + timeLimitDelta;
3212    }
3213    // Default value of timeLimit is negative to indicate no timeLimit should be
3214    // enforced.
3215    return -1L;
3216  }
3217
3218  private void checkLimitOfRows(int numOfCompleteRows, int limitOfRows, boolean moreRows,
3219      ScannerContext scannerContext, ScanResponse.Builder builder) {
3220    if (numOfCompleteRows >= limitOfRows) {
3221      if (LOG.isTraceEnabled()) {
3222        LOG.trace("Done scanning, limit of rows reached, moreRows: " + moreRows +
3223            " scannerContext: " + scannerContext);
3224      }
3225      builder.setMoreResults(false);
3226    }
3227  }
3228
3229  // return whether we have more results in region.
3230  private void scan(HBaseRpcController controller, ScanRequest request, RegionScannerHolder rsh,
3231      long maxQuotaResultSize, int maxResults, int limitOfRows, List<Result> results,
3232      ScanResponse.Builder builder, MutableObject<Object> lastBlock, RpcCallContext context)
3233      throws IOException {
3234    HRegion region = rsh.r;
3235    RegionScanner scanner = rsh.s;
3236    long maxResultSize;
3237    if (scanner.getMaxResultSize() > 0) {
3238      maxResultSize = Math.min(scanner.getMaxResultSize(), maxQuotaResultSize);
3239    } else {
3240      maxResultSize = maxQuotaResultSize;
3241    }
3242    // This is cells inside a row. Default size is 10 so if many versions or many cfs,
3243    // then we'll resize. Resizings show in profiler. Set it higher than 10. For now
3244    // arbitrary 32. TODO: keep record of general size of results being returned.
3245    List<Cell> values = new ArrayList<>(32);
3246    region.startRegionOperation(Operation.SCAN);
3247    try {
3248      int numOfResults = 0;
3249      int numOfCompleteRows = 0;
3250      long before = EnvironmentEdgeManager.currentTime();
3251      synchronized (scanner) {
3252        boolean stale = (region.getRegionInfo().getReplicaId() != 0);
3253        boolean clientHandlesPartials =
3254            request.hasClientHandlesPartials() && request.getClientHandlesPartials();
3255        boolean clientHandlesHeartbeats =
3256            request.hasClientHandlesHeartbeats() && request.getClientHandlesHeartbeats();
3257
3258        // On the server side we must ensure that the correct ordering of partial results is
3259        // returned to the client to allow them to properly reconstruct the partial results.
3260        // If the coprocessor host is adding to the result list, we cannot guarantee the
3261        // correct ordering of partial results and so we prevent partial results from being
3262        // formed.
3263        boolean serverGuaranteesOrderOfPartials = results.isEmpty();
3264        boolean allowPartialResults = clientHandlesPartials && serverGuaranteesOrderOfPartials;
3265        boolean moreRows = false;
3266
3267        // Heartbeat messages occur when the processing of the ScanRequest is exceeds a
3268        // certain time threshold on the server. When the time threshold is exceeded, the
3269        // server stops the scan and sends back whatever Results it has accumulated within
3270        // that time period (may be empty). Since heartbeat messages have the potential to
3271        // create partial Results (in the event that the timeout occurs in the middle of a
3272        // row), we must only generate heartbeat messages when the client can handle both
3273        // heartbeats AND partials
3274        boolean allowHeartbeatMessages = clientHandlesHeartbeats && allowPartialResults;
3275
3276        long timeLimit = getTimeLimit(controller, allowHeartbeatMessages);
3277
3278        final LimitScope sizeScope =
3279            allowPartialResults ? LimitScope.BETWEEN_CELLS : LimitScope.BETWEEN_ROWS;
3280        final LimitScope timeScope =
3281            allowHeartbeatMessages ? LimitScope.BETWEEN_CELLS : LimitScope.BETWEEN_ROWS;
3282
3283        boolean trackMetrics = request.hasTrackScanMetrics() && request.getTrackScanMetrics();
3284
3285        // Configure with limits for this RPC. Set keep progress true since size progress
3286        // towards size limit should be kept between calls to nextRaw
3287        ScannerContext.Builder contextBuilder = ScannerContext.newBuilder(true);
3288        // maxResultSize - either we can reach this much size for all cells(being read) data or sum
3289        // of heap size occupied by cells(being read). Cell data means its key and value parts.
3290        contextBuilder.setSizeLimit(sizeScope, maxResultSize, maxResultSize);
3291        contextBuilder.setBatchLimit(scanner.getBatch());
3292        contextBuilder.setTimeLimit(timeScope, timeLimit);
3293        contextBuilder.setTrackMetrics(trackMetrics);
3294        ScannerContext scannerContext = contextBuilder.build();
3295        boolean limitReached = false;
3296        while (numOfResults < maxResults) {
3297          // Reset the batch progress to 0 before every call to RegionScanner#nextRaw. The
3298          // batch limit is a limit on the number of cells per Result. Thus, if progress is
3299          // being tracked (i.e. scannerContext.keepProgress() is true) then we need to
3300          // reset the batch progress between nextRaw invocations since we don't want the
3301          // batch progress from previous calls to affect future calls
3302          scannerContext.setBatchProgress(0);
3303
3304          // Collect values to be returned here
3305          moreRows = scanner.nextRaw(values, scannerContext);
3306
3307          if (!values.isEmpty()) {
3308            if (limitOfRows > 0) {
3309              // First we need to check if the last result is partial and we have a row change. If
3310              // so then we need to increase the numOfCompleteRows.
3311              if (results.isEmpty()) {
3312                if (rsh.rowOfLastPartialResult != null &&
3313                    !CellUtil.matchingRows(values.get(0), rsh.rowOfLastPartialResult)) {
3314                  numOfCompleteRows++;
3315                  checkLimitOfRows(numOfCompleteRows, limitOfRows, moreRows, scannerContext,
3316                    builder);
3317                }
3318              } else {
3319                Result lastResult = results.get(results.size() - 1);
3320                if (lastResult.mayHaveMoreCellsInRow() &&
3321                    !CellUtil.matchingRows(values.get(0), lastResult.getRow())) {
3322                  numOfCompleteRows++;
3323                  checkLimitOfRows(numOfCompleteRows, limitOfRows, moreRows, scannerContext,
3324                    builder);
3325                }
3326              }
3327              if (builder.hasMoreResults() && !builder.getMoreResults()) {
3328                break;
3329              }
3330            }
3331            boolean mayHaveMoreCellsInRow = scannerContext.mayHaveMoreCellsInRow();
3332            Result r = Result.create(values, null, stale, mayHaveMoreCellsInRow);
3333            lastBlock.setValue(addSize(context, r, lastBlock.getValue()));
3334            results.add(r);
3335            numOfResults++;
3336            if (!mayHaveMoreCellsInRow && limitOfRows > 0) {
3337              numOfCompleteRows++;
3338              checkLimitOfRows(numOfCompleteRows, limitOfRows, moreRows, scannerContext, builder);
3339              if (builder.hasMoreResults() && !builder.getMoreResults()) {
3340                break;
3341              }
3342            }
3343          } else if (!moreRows && !results.isEmpty()) {
3344            // No more cells for the scan here, we need to ensure that the mayHaveMoreCellsInRow of
3345            // last result is false. Otherwise it's possible that: the first nextRaw returned
3346            // because BATCH_LIMIT_REACHED (BTW it happen to exhaust all cells of the scan),so the
3347            // last result's mayHaveMoreCellsInRow will be true. while the following nextRaw will
3348            // return with moreRows=false, which means moreResultsInRegion would be false, it will
3349            // be a contradictory state (HBASE-21206).
3350            int lastIdx = results.size() - 1;
3351            Result r = results.get(lastIdx);
3352            if (r.mayHaveMoreCellsInRow()) {
3353              results.set(lastIdx, Result.create(r.rawCells(), r.getExists(), r.isStale(), false));
3354            }
3355          }
3356          boolean sizeLimitReached = scannerContext.checkSizeLimit(LimitScope.BETWEEN_ROWS);
3357          boolean timeLimitReached = scannerContext.checkTimeLimit(LimitScope.BETWEEN_ROWS);
3358          boolean resultsLimitReached = numOfResults >= maxResults;
3359          limitReached = sizeLimitReached || timeLimitReached || resultsLimitReached;
3360
3361          if (limitReached || !moreRows) {
3362            // We only want to mark a ScanResponse as a heartbeat message in the event that
3363            // there are more values to be read server side. If there aren't more values,
3364            // marking it as a heartbeat is wasteful because the client will need to issue
3365            // another ScanRequest only to realize that they already have all the values
3366            if (moreRows && timeLimitReached) {
3367              // Heartbeat messages occur when the time limit has been reached.
3368              builder.setHeartbeatMessage(true);
3369              if (rsh.needCursor) {
3370                Cell cursorCell = scannerContext.getLastPeekedCell();
3371                if (cursorCell != null) {
3372                  builder.setCursor(ProtobufUtil.toCursor(cursorCell));
3373                }
3374              }
3375            }
3376            break;
3377          }
3378          values.clear();
3379        }
3380        builder.setMoreResultsInRegion(moreRows);
3381        // Check to see if the client requested that we track metrics server side. If the
3382        // client requested metrics, retrieve the metrics from the scanner context.
3383        if (trackMetrics) {
3384          Map<String, Long> metrics = scannerContext.getMetrics().getMetricsMap();
3385          ScanMetrics.Builder metricBuilder = ScanMetrics.newBuilder();
3386          NameInt64Pair.Builder pairBuilder = NameInt64Pair.newBuilder();
3387
3388          for (Entry<String, Long> entry : metrics.entrySet()) {
3389            pairBuilder.setName(entry.getKey());
3390            pairBuilder.setValue(entry.getValue());
3391            metricBuilder.addMetrics(pairBuilder.build());
3392          }
3393
3394          builder.setScanMetrics(metricBuilder.build());
3395        }
3396      }
3397      long end = EnvironmentEdgeManager.currentTime();
3398      long responseCellSize = context != null ? context.getResponseCellSize() : 0;
3399      region.getMetrics().updateScanTime(end - before);
3400      final MetricsRegionServer metricsRegionServer = regionServer.getMetrics();
3401      if (metricsRegionServer != null) {
3402        metricsRegionServer.updateScanSize(
3403            region.getTableDescriptor().getTableName(), responseCellSize);
3404        metricsRegionServer.updateScanTime(
3405            region.getTableDescriptor().getTableName(), end - before);
3406      }
3407    } finally {
3408      region.closeRegionOperation();
3409    }
3410    // coprocessor postNext hook
3411    if (region.getCoprocessorHost() != null) {
3412      region.getCoprocessorHost().postScannerNext(scanner, results, maxResults, true);
3413    }
3414  }
3415
3416  /**
3417   * Scan data in a table.
3418   *
3419   * @param controller the RPC controller
3420   * @param request the scan request
3421   * @throws ServiceException
3422   */
3423  @Override
3424  public ScanResponse scan(final RpcController controller, final ScanRequest request)
3425      throws ServiceException {
3426    if (controller != null && !(controller instanceof HBaseRpcController)) {
3427      throw new UnsupportedOperationException(
3428          "We only do " + "HBaseRpcControllers! FIX IF A PROBLEM: " + controller);
3429    }
3430    if (!request.hasScannerId() && !request.hasScan()) {
3431      throw new ServiceException(
3432          new DoNotRetryIOException("Missing required input: scannerId or scan"));
3433    }
3434    try {
3435      checkOpen();
3436    } catch (IOException e) {
3437      if (request.hasScannerId()) {
3438        String scannerName = Long.toString(request.getScannerId());
3439        if (LOG.isDebugEnabled()) {
3440          LOG.debug(
3441            "Server shutting down and client tried to access missing scanner " + scannerName);
3442        }
3443        final LeaseManager leaseManager = regionServer.getLeaseManager();
3444        if (leaseManager != null) {
3445          try {
3446            leaseManager.cancelLease(scannerName);
3447          } catch (LeaseException le) {
3448            // No problem, ignore
3449            if (LOG.isTraceEnabled()) {
3450              LOG.trace("Un-able to cancel lease of scanner. It could already be closed.");
3451            }
3452          }
3453        }
3454      }
3455      throw new ServiceException(e);
3456    }
3457    requestCount.increment();
3458    rpcScanRequestCount.increment();
3459    RegionScannerHolder rsh;
3460    ScanResponse.Builder builder = ScanResponse.newBuilder();
3461    try {
3462      if (request.hasScannerId()) {
3463        // The downstream projects such as AsyncHBase in OpenTSDB need this value. See HBASE-18000
3464        // for more details.
3465        builder.setScannerId(request.getScannerId());
3466        rsh = getRegionScanner(request);
3467      } else {
3468        rsh = newRegionScanner(request, builder);
3469      }
3470    } catch (IOException e) {
3471      if (e == SCANNER_ALREADY_CLOSED) {
3472        // Now we will close scanner automatically if there are no more results for this region but
3473        // the old client will still send a close request to us. Just ignore it and return.
3474        return builder.build();
3475      }
3476      throw new ServiceException(e);
3477    }
3478    if (rsh.fullRegionScan) {
3479      rpcFullScanRequestCount.increment();
3480    }
3481    HRegion region = rsh.r;
3482    String scannerName = rsh.scannerName;
3483    LeaseManager.Lease lease;
3484    try {
3485      // Remove lease while its being processed in server; protects against case
3486      // where processing of request takes > lease expiration time.
3487      lease = regionServer.getLeaseManager().removeLease(scannerName);
3488    } catch (LeaseException e) {
3489      throw new ServiceException(e);
3490    }
3491    if (request.hasRenew() && request.getRenew()) {
3492      // add back and return
3493      addScannerLeaseBack(lease);
3494      try {
3495        checkScanNextCallSeq(request, rsh);
3496      } catch (OutOfOrderScannerNextException e) {
3497        throw new ServiceException(e);
3498      }
3499      return builder.build();
3500    }
3501    OperationQuota quota;
3502    try {
3503      quota = getRpcQuotaManager().checkQuota(region, OperationQuota.OperationType.SCAN);
3504    } catch (IOException e) {
3505      addScannerLeaseBack(lease);
3506      throw new ServiceException(e);
3507    }
3508    try {
3509      checkScanNextCallSeq(request, rsh);
3510    } catch (OutOfOrderScannerNextException e) {
3511      addScannerLeaseBack(lease);
3512      throw new ServiceException(e);
3513    }
3514    // Now we have increased the next call sequence. If we give client an error, the retry will
3515    // never success. So we'd better close the scanner and return a DoNotRetryIOException to client
3516    // and then client will try to open a new scanner.
3517    boolean closeScanner = request.hasCloseScanner() ? request.getCloseScanner() : false;
3518    int rows; // this is scan.getCaching
3519    if (request.hasNumberOfRows()) {
3520      rows = request.getNumberOfRows();
3521    } else {
3522      rows = closeScanner ? 0 : 1;
3523    }
3524    RpcCallContext context = RpcServer.getCurrentCall().orElse(null);
3525    // now let's do the real scan.
3526    long maxQuotaResultSize = Math.min(maxScannerResultSize, quota.getReadAvailable());
3527    RegionScanner scanner = rsh.s;
3528    // this is the limit of rows for this scan, if we the number of rows reach this value, we will
3529    // close the scanner.
3530    int limitOfRows;
3531    if (request.hasLimitOfRows()) {
3532      limitOfRows = request.getLimitOfRows();
3533    } else {
3534      limitOfRows = -1;
3535    }
3536    MutableObject<Object> lastBlock = new MutableObject<>();
3537    boolean scannerClosed = false;
3538    try {
3539      List<Result> results = new ArrayList<>(Math.min(rows, 512));
3540      if (rows > 0) {
3541        boolean done = false;
3542        // Call coprocessor. Get region info from scanner.
3543        if (region.getCoprocessorHost() != null) {
3544          Boolean bypass = region.getCoprocessorHost().preScannerNext(scanner, results, rows);
3545          if (!results.isEmpty()) {
3546            for (Result r : results) {
3547              lastBlock.setValue(addSize(context, r, lastBlock.getValue()));
3548            }
3549          }
3550          if (bypass != null && bypass.booleanValue()) {
3551            done = true;
3552          }
3553        }
3554        if (!done) {
3555          scan((HBaseRpcController) controller, request, rsh, maxQuotaResultSize, rows, limitOfRows,
3556            results, builder, lastBlock, context);
3557        } else {
3558          builder.setMoreResultsInRegion(!results.isEmpty());
3559        }
3560      } else {
3561        // This is a open scanner call with numberOfRow = 0, so set more results in region to true.
3562        builder.setMoreResultsInRegion(true);
3563      }
3564
3565      quota.addScanResult(results);
3566      addResults(builder, results, (HBaseRpcController) controller,
3567        RegionReplicaUtil.isDefaultReplica(region.getRegionInfo()),
3568        isClientCellBlockSupport(context));
3569      if (scanner.isFilterDone() && results.isEmpty()) {
3570        // If the scanner's filter - if any - is done with the scan
3571        // only set moreResults to false if the results is empty. This is used to keep compatible
3572        // with the old scan implementation where we just ignore the returned results if moreResults
3573        // is false. Can remove the isEmpty check after we get rid of the old implementation.
3574        builder.setMoreResults(false);
3575      }
3576      // Later we may close the scanner depending on this flag so here we need to make sure that we
3577      // have already set this flag.
3578      assert builder.hasMoreResultsInRegion();
3579      // we only set moreResults to false in the above code, so set it to true if we haven't set it
3580      // yet.
3581      if (!builder.hasMoreResults()) {
3582        builder.setMoreResults(true);
3583      }
3584      if (builder.getMoreResults() && builder.getMoreResultsInRegion() && !results.isEmpty()) {
3585        // Record the last cell of the last result if it is a partial result
3586        // We need this to calculate the complete rows we have returned to client as the
3587        // mayHaveMoreCellsInRow is true does not mean that there will be extra cells for the
3588        // current row. We may filter out all the remaining cells for the current row and just
3589        // return the cells of the nextRow when calling RegionScanner.nextRaw. So here we need to
3590        // check for row change.
3591        Result lastResult = results.get(results.size() - 1);
3592        if (lastResult.mayHaveMoreCellsInRow()) {
3593          rsh.rowOfLastPartialResult = lastResult.getRow();
3594        } else {
3595          rsh.rowOfLastPartialResult = null;
3596        }
3597      }
3598      if (!builder.getMoreResults() || !builder.getMoreResultsInRegion() || closeScanner) {
3599        scannerClosed = true;
3600        closeScanner(region, scanner, scannerName, context);
3601      }
3602      return builder.build();
3603    } catch (IOException e) {
3604      try {
3605        // scanner is closed here
3606        scannerClosed = true;
3607        // The scanner state might be left in a dirty state, so we will tell the Client to
3608        // fail this RPC and close the scanner while opening up another one from the start of
3609        // row that the client has last seen.
3610        closeScanner(region, scanner, scannerName, context);
3611
3612        // If it is a DoNotRetryIOException already, throw as it is. Unfortunately, DNRIOE is
3613        // used in two different semantics.
3614        // (1) The first is to close the client scanner and bubble up the exception all the way
3615        // to the application. This is preferred when the exception is really un-recoverable
3616        // (like CorruptHFileException, etc). Plain DoNotRetryIOException also falls into this
3617        // bucket usually.
3618        // (2) Second semantics is to close the current region scanner only, but continue the
3619        // client scanner by overriding the exception. This is usually UnknownScannerException,
3620        // OutOfOrderScannerNextException, etc where the region scanner has to be closed, but the
3621        // application-level ClientScanner has to continue without bubbling up the exception to
3622        // the client. See ClientScanner code to see how it deals with these special exceptions.
3623        if (e instanceof DoNotRetryIOException) {
3624          throw e;
3625        }
3626
3627        // If it is a FileNotFoundException, wrap as a
3628        // DoNotRetryIOException. This can avoid the retry in ClientScanner.
3629        if (e instanceof FileNotFoundException) {
3630          throw new DoNotRetryIOException(e);
3631        }
3632
3633        // We closed the scanner already. Instead of throwing the IOException, and client
3634        // retrying with the same scannerId only to get USE on the next RPC, we directly throw
3635        // a special exception to save an RPC.
3636        if (VersionInfoUtil.hasMinimumVersion(context.getClientVersionInfo(), 1, 4)) {
3637          // 1.4.0+ clients know how to handle
3638          throw new ScannerResetException("Scanner is closed on the server-side", e);
3639        } else {
3640          // older clients do not know about SRE. Just throw USE, which they will handle
3641          throw new UnknownScannerException("Throwing UnknownScannerException to reset the client"
3642              + " scanner state for clients older than 1.3.", e);
3643        }
3644      } catch (IOException ioe) {
3645        throw new ServiceException(ioe);
3646      }
3647    } finally {
3648      if (!scannerClosed) {
3649        // Adding resets expiration time on lease.
3650        // the closeCallBack will be set in closeScanner so here we only care about shippedCallback
3651        if (context != null) {
3652          context.setCallBack(rsh.shippedCallback);
3653        } else {
3654          // When context != null, adding back the lease will be done in callback set above.
3655          addScannerLeaseBack(lease);
3656        }
3657      }
3658      quota.close();
3659    }
3660  }
3661
3662  private void closeScanner(HRegion region, RegionScanner scanner, String scannerName,
3663      RpcCallContext context) throws IOException {
3664    if (region.getCoprocessorHost() != null) {
3665      if (region.getCoprocessorHost().preScannerClose(scanner)) {
3666        // bypass the actual close.
3667        return;
3668      }
3669    }
3670    RegionScannerHolder rsh = scanners.remove(scannerName);
3671    if (rsh != null) {
3672      if (context != null) {
3673        context.setCallBack(rsh.closeCallBack);
3674      } else {
3675        rsh.s.close();
3676      }
3677      if (region.getCoprocessorHost() != null) {
3678        region.getCoprocessorHost().postScannerClose(scanner);
3679      }
3680      closedScanners.put(scannerName, scannerName);
3681    }
3682  }
3683
3684  @Override
3685  public CoprocessorServiceResponse execRegionServerService(RpcController controller,
3686      CoprocessorServiceRequest request) throws ServiceException {
3687    rpcPreCheck("execRegionServerService");
3688    return regionServer.execRegionServerService(controller, request);
3689  }
3690
3691  @Override
3692  public UpdateConfigurationResponse updateConfiguration(
3693      RpcController controller, UpdateConfigurationRequest request)
3694      throws ServiceException {
3695    try {
3696      this.regionServer.updateConfiguration();
3697    } catch (Exception e) {
3698      throw new ServiceException(e);
3699    }
3700    return UpdateConfigurationResponse.getDefaultInstance();
3701  }
3702
3703  @Override
3704  public GetSpaceQuotaSnapshotsResponse getSpaceQuotaSnapshots(
3705      RpcController controller, GetSpaceQuotaSnapshotsRequest request) throws ServiceException {
3706    try {
3707      final RegionServerSpaceQuotaManager manager =
3708          regionServer.getRegionServerSpaceQuotaManager();
3709      final GetSpaceQuotaSnapshotsResponse.Builder builder =
3710          GetSpaceQuotaSnapshotsResponse.newBuilder();
3711      if (manager != null) {
3712        final Map<TableName,SpaceQuotaSnapshot> snapshots = manager.copyQuotaSnapshots();
3713        for (Entry<TableName,SpaceQuotaSnapshot> snapshot : snapshots.entrySet()) {
3714          builder.addSnapshots(TableQuotaSnapshot.newBuilder()
3715              .setTableName(ProtobufUtil.toProtoTableName(snapshot.getKey()))
3716              .setSnapshot(SpaceQuotaSnapshot.toProtoSnapshot(snapshot.getValue()))
3717              .build());
3718        }
3719      }
3720      return builder.build();
3721    } catch (Exception e) {
3722      throw new ServiceException(e);
3723    }
3724  }
3725
3726  @Override
3727  public ClearRegionBlockCacheResponse clearRegionBlockCache(RpcController controller,
3728      ClearRegionBlockCacheRequest request) {
3729    ClearRegionBlockCacheResponse.Builder builder =
3730        ClearRegionBlockCacheResponse.newBuilder();
3731    CacheEvictionStatsBuilder stats = CacheEvictionStats.builder();
3732    List<HRegion> regions = getRegions(request.getRegionList(), stats);
3733    for (HRegion region : regions) {
3734      try {
3735        stats = stats.append(this.regionServer.clearRegionBlockCache(region));
3736      } catch (Exception e) {
3737        stats.addException(region.getRegionInfo().getRegionName(), e);
3738      }
3739    }
3740    stats.withMaxCacheSize(regionServer.getBlockCache().map(BlockCache::getMaxSize).orElse(0L));
3741    return builder.setStats(ProtobufUtil.toCacheEvictionStats(stats.build())).build();
3742  }
3743
3744  private void executeOpenRegionProcedures(OpenRegionRequest request,
3745      Map<TableName, TableDescriptor> tdCache) {
3746    long masterSystemTime = request.hasMasterSystemTime() ? request.getMasterSystemTime() : -1;
3747    for (RegionOpenInfo regionOpenInfo : request.getOpenInfoList()) {
3748      RegionInfo regionInfo = ProtobufUtil.toRegionInfo(regionOpenInfo.getRegion());
3749      TableDescriptor tableDesc = tdCache.get(regionInfo.getTable());
3750      if (tableDesc == null) {
3751        try {
3752          tableDesc = regionServer.getTableDescriptors().get(regionInfo.getTable());
3753        } catch (IOException e) {
3754          // Here we do not fail the whole method since we also need deal with other
3755          // procedures, and we can not ignore this one, so we still schedule a
3756          // AssignRegionHandler and it will report back to master if we still can not get the
3757          // TableDescriptor.
3758          LOG.warn("Failed to get TableDescriptor of {}, will try again in the handler",
3759            regionInfo.getTable(), e);
3760        }
3761      }
3762      if (regionOpenInfo.getFavoredNodesCount() > 0) {
3763        regionServer.updateRegionFavoredNodesMapping(regionInfo.getEncodedName(),
3764          regionOpenInfo.getFavoredNodesList());
3765      }
3766      long procId = regionOpenInfo.getOpenProcId();
3767      if (regionServer.submitRegionProcedure(procId)) {
3768        regionServer.executorService.submit(AssignRegionHandler
3769            .create(regionServer, regionInfo, procId, tableDesc,
3770                masterSystemTime));
3771      }
3772    }
3773  }
3774
3775  private void executeCloseRegionProcedures(CloseRegionRequest request) {
3776    String encodedName;
3777    try {
3778      encodedName = ProtobufUtil.getRegionEncodedName(request.getRegion());
3779    } catch (DoNotRetryIOException e) {
3780      throw new UncheckedIOException("Should not happen", e);
3781    }
3782    ServerName destination = request.hasDestinationServer() ?
3783        ProtobufUtil.toServerName(request.getDestinationServer()) :
3784        null;
3785    long procId = request.getCloseProcId();
3786    if (regionServer.submitRegionProcedure(procId)) {
3787      regionServer.executorService.submit(UnassignRegionHandler
3788          .create(regionServer, encodedName, procId, false, destination));
3789    }
3790  }
3791
3792  private void executeProcedures(RemoteProcedureRequest request) {
3793    RSProcedureCallable callable;
3794    try {
3795      callable = Class.forName(request.getProcClass()).asSubclass(RSProcedureCallable.class)
3796        .getDeclaredConstructor().newInstance();
3797    } catch (Exception e) {
3798      LOG.warn("Failed to instantiating remote procedure {}, pid={}", request.getProcClass(),
3799        request.getProcId(), e);
3800      regionServer.remoteProcedureComplete(request.getProcId(), e);
3801      return;
3802    }
3803    callable.init(request.getProcData().toByteArray(), regionServer);
3804    LOG.debug("Executing remote procedure {}, pid={}", callable.getClass(), request.getProcId());
3805    regionServer.executeProcedure(request.getProcId(), callable);
3806  }
3807
3808  @Override
3809  @QosPriority(priority = HConstants.ADMIN_QOS)
3810  public ExecuteProceduresResponse executeProcedures(RpcController controller,
3811      ExecuteProceduresRequest request) throws ServiceException {
3812    try {
3813      checkOpen();
3814      throwOnWrongStartCode(request);
3815      regionServer.getRegionServerCoprocessorHost().preExecuteProcedures();
3816      if (request.getOpenRegionCount() > 0) {
3817        // Avoid reading from the TableDescritor every time(usually it will read from the file
3818        // system)
3819        Map<TableName, TableDescriptor> tdCache = new HashMap<>();
3820        request.getOpenRegionList().forEach(req -> executeOpenRegionProcedures(req, tdCache));
3821      }
3822      if (request.getCloseRegionCount() > 0) {
3823        request.getCloseRegionList().forEach(this::executeCloseRegionProcedures);
3824      }
3825      if (request.getProcCount() > 0) {
3826        request.getProcList().forEach(this::executeProcedures);
3827      }
3828      regionServer.getRegionServerCoprocessorHost().postExecuteProcedures();
3829      return ExecuteProceduresResponse.getDefaultInstance();
3830    } catch (IOException e) {
3831      throw new ServiceException(e);
3832    }
3833  }
3834
3835  @Override
3836  @QosPriority(priority = HConstants.ADMIN_QOS)
3837  public SlowLogResponses getSlowLogResponses(final RpcController controller,
3838      final SlowLogResponseRequest request) {
3839    final NamedQueueRecorder namedQueueRecorder =
3840      this.regionServer.getNamedQueueRecorder();
3841    final List<SlowLogPayload> slowLogPayloads = getSlowLogPayloads(request, namedQueueRecorder);
3842    SlowLogResponses slowLogResponses = SlowLogResponses.newBuilder()
3843      .addAllSlowLogPayloads(slowLogPayloads)
3844      .build();
3845    return slowLogResponses;
3846  }
3847
3848  private List<SlowLogPayload> getSlowLogPayloads(SlowLogResponseRequest request,
3849      NamedQueueRecorder namedQueueRecorder) {
3850    if (namedQueueRecorder == null) {
3851      return Collections.emptyList();
3852    }
3853    List<SlowLogPayload> slowLogPayloads;
3854    NamedQueueGetRequest namedQueueGetRequest = new NamedQueueGetRequest();
3855    namedQueueGetRequest.setNamedQueueEvent(RpcLogDetails.SLOW_LOG_EVENT);
3856    namedQueueGetRequest.setSlowLogResponseRequest(request);
3857    NamedQueueGetResponse namedQueueGetResponse =
3858      namedQueueRecorder.getNamedQueueRecords(namedQueueGetRequest);
3859    slowLogPayloads = namedQueueGetResponse != null ?
3860      namedQueueGetResponse.getSlowLogPayloads() :
3861      Collections.emptyList();
3862    return slowLogPayloads;
3863  }
3864
3865  @Override
3866  @QosPriority(priority = HConstants.ADMIN_QOS)
3867  public SlowLogResponses getLargeLogResponses(final RpcController controller,
3868      final SlowLogResponseRequest request) {
3869    final NamedQueueRecorder namedQueueRecorder =
3870      this.regionServer.getNamedQueueRecorder();
3871    final List<SlowLogPayload> slowLogPayloads = getSlowLogPayloads(request, namedQueueRecorder);
3872    SlowLogResponses slowLogResponses = SlowLogResponses.newBuilder()
3873      .addAllSlowLogPayloads(slowLogPayloads)
3874      .build();
3875    return slowLogResponses;
3876  }
3877
3878  @Override
3879  @QosPriority(priority = HConstants.ADMIN_QOS)
3880  public ClearSlowLogResponses clearSlowLogsResponses(final RpcController controller,
3881    final ClearSlowLogResponseRequest request) {
3882    final NamedQueueRecorder namedQueueRecorder =
3883      this.regionServer.getNamedQueueRecorder();
3884    boolean slowLogsCleaned = Optional.ofNullable(namedQueueRecorder)
3885      .map(queueRecorder ->
3886        queueRecorder.clearNamedQueue(NamedQueuePayload.NamedQueueEvent.SLOW_LOG))
3887      .orElse(false);
3888    ClearSlowLogResponses clearSlowLogResponses = ClearSlowLogResponses.newBuilder()
3889      .setIsCleaned(slowLogsCleaned)
3890      .build();
3891    return clearSlowLogResponses;
3892  }
3893
3894  @Override
3895  public HBaseProtos.LogEntry getLogEntries(RpcController controller,
3896      HBaseProtos.LogRequest request) throws ServiceException {
3897    try {
3898      final String logClassName = request.getLogClassName();
3899      Class<?> logClass = Class.forName(logClassName)
3900        .asSubclass(Message.class);
3901      Method method = logClass.getMethod("parseFrom", ByteString.class);
3902      if (logClassName.contains("SlowLogResponseRequest")) {
3903        SlowLogResponseRequest slowLogResponseRequest =
3904          (SlowLogResponseRequest) method.invoke(null, request.getLogMessage());
3905        final NamedQueueRecorder namedQueueRecorder =
3906          this.regionServer.getNamedQueueRecorder();
3907        final List<SlowLogPayload> slowLogPayloads =
3908          getSlowLogPayloads(slowLogResponseRequest, namedQueueRecorder);
3909        SlowLogResponses slowLogResponses = SlowLogResponses.newBuilder()
3910          .addAllSlowLogPayloads(slowLogPayloads)
3911          .build();
3912        return HBaseProtos.LogEntry.newBuilder()
3913          .setLogClassName(slowLogResponses.getClass().getName())
3914          .setLogMessage(slowLogResponses.toByteString()).build();
3915      }
3916    } catch (ClassNotFoundException | NoSuchMethodException | IllegalAccessException
3917        | InvocationTargetException e) {
3918      LOG.error("Error while retrieving log entries.", e);
3919      throw new ServiceException(e);
3920    }
3921    throw new ServiceException("Invalid request params");
3922  }
3923
3924  public RpcScheduler getRpcScheduler() {
3925    return rpcServer.getScheduler();
3926  }
3927
3928  protected AccessChecker getAccessChecker() {
3929    return accessChecker;
3930  }
3931
3932  protected ZKPermissionWatcher getZkPermissionWatcher() {
3933    return zkPermissionWatcher;
3934  }
3935}