001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import com.google.errorprone.annotations.RestrictedApi;
021import java.io.FileNotFoundException;
022import java.io.IOException;
023import java.io.UncheckedIOException;
024import java.lang.reflect.InvocationTargetException;
025import java.lang.reflect.Method;
026import java.net.BindException;
027import java.net.InetAddress;
028import java.net.InetSocketAddress;
029import java.nio.ByteBuffer;
030import java.util.ArrayList;
031import java.util.Arrays;
032import java.util.Collections;
033import java.util.HashMap;
034import java.util.Iterator;
035import java.util.List;
036import java.util.Map;
037import java.util.Map.Entry;
038import java.util.NavigableMap;
039import java.util.Optional;
040import java.util.Set;
041import java.util.TreeSet;
042import java.util.concurrent.ConcurrentHashMap;
043import java.util.concurrent.ConcurrentMap;
044import java.util.concurrent.TimeUnit;
045import java.util.concurrent.atomic.AtomicBoolean;
046import java.util.concurrent.atomic.AtomicLong;
047import java.util.concurrent.atomic.LongAdder;
048import org.apache.commons.lang3.mutable.MutableObject;
049import org.apache.hadoop.conf.Configuration;
050import org.apache.hadoop.fs.Path;
051import org.apache.hadoop.hbase.ByteBufferExtendedCell;
052import org.apache.hadoop.hbase.CacheEvictionStats;
053import org.apache.hadoop.hbase.CacheEvictionStatsBuilder;
054import org.apache.hadoop.hbase.Cell;
055import org.apache.hadoop.hbase.CellScannable;
056import org.apache.hadoop.hbase.CellScanner;
057import org.apache.hadoop.hbase.CellUtil;
058import org.apache.hadoop.hbase.DoNotRetryIOException;
059import org.apache.hadoop.hbase.DroppedSnapshotException;
060import org.apache.hadoop.hbase.HBaseIOException;
061import org.apache.hadoop.hbase.HConstants;
062import org.apache.hadoop.hbase.HRegionLocation;
063import org.apache.hadoop.hbase.MultiActionResultTooLarge;
064import org.apache.hadoop.hbase.NotServingRegionException;
065import org.apache.hadoop.hbase.PrivateCellUtil;
066import org.apache.hadoop.hbase.RegionTooBusyException;
067import org.apache.hadoop.hbase.Server;
068import org.apache.hadoop.hbase.ServerName;
069import org.apache.hadoop.hbase.TableName;
070import org.apache.hadoop.hbase.UnknownScannerException;
071import org.apache.hadoop.hbase.client.Append;
072import org.apache.hadoop.hbase.client.CheckAndMutate;
073import org.apache.hadoop.hbase.client.CheckAndMutateResult;
074import org.apache.hadoop.hbase.client.ConnectionUtils;
075import org.apache.hadoop.hbase.client.Delete;
076import org.apache.hadoop.hbase.client.Durability;
077import org.apache.hadoop.hbase.client.Get;
078import org.apache.hadoop.hbase.client.Increment;
079import org.apache.hadoop.hbase.client.Mutation;
080import org.apache.hadoop.hbase.client.OperationWithAttributes;
081import org.apache.hadoop.hbase.client.Put;
082import org.apache.hadoop.hbase.client.RegionInfo;
083import org.apache.hadoop.hbase.client.RegionReplicaUtil;
084import org.apache.hadoop.hbase.client.Result;
085import org.apache.hadoop.hbase.client.Row;
086import org.apache.hadoop.hbase.client.Scan;
087import org.apache.hadoop.hbase.client.TableDescriptor;
088import org.apache.hadoop.hbase.client.VersionInfoUtil;
089import org.apache.hadoop.hbase.conf.ConfigurationObserver;
090import org.apache.hadoop.hbase.exceptions.FailedSanityCheckException;
091import org.apache.hadoop.hbase.exceptions.OutOfOrderScannerNextException;
092import org.apache.hadoop.hbase.exceptions.ScannerResetException;
093import org.apache.hadoop.hbase.exceptions.TimeoutIOException;
094import org.apache.hadoop.hbase.exceptions.UnknownProtocolException;
095import org.apache.hadoop.hbase.io.ByteBuffAllocator;
096import org.apache.hadoop.hbase.io.hfile.BlockCache;
097import org.apache.hadoop.hbase.ipc.HBaseRPCErrorHandler;
098import org.apache.hadoop.hbase.ipc.HBaseRpcController;
099import org.apache.hadoop.hbase.ipc.PriorityFunction;
100import org.apache.hadoop.hbase.ipc.QosPriority;
101import org.apache.hadoop.hbase.ipc.RpcCall;
102import org.apache.hadoop.hbase.ipc.RpcCallContext;
103import org.apache.hadoop.hbase.ipc.RpcCallback;
104import org.apache.hadoop.hbase.ipc.RpcScheduler;
105import org.apache.hadoop.hbase.ipc.RpcServer;
106import org.apache.hadoop.hbase.ipc.RpcServer.BlockingServiceAndInterface;
107import org.apache.hadoop.hbase.ipc.RpcServerFactory;
108import org.apache.hadoop.hbase.ipc.RpcServerInterface;
109import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
110import org.apache.hadoop.hbase.ipc.ServerRpcController;
111import org.apache.hadoop.hbase.log.HBaseMarkers;
112import org.apache.hadoop.hbase.master.HMaster;
113import org.apache.hadoop.hbase.master.MasterRpcServices;
114import org.apache.hadoop.hbase.namequeues.NamedQueuePayload;
115import org.apache.hadoop.hbase.namequeues.NamedQueueRecorder;
116import org.apache.hadoop.hbase.namequeues.RpcLogDetails;
117import org.apache.hadoop.hbase.namequeues.request.NamedQueueGetRequest;
118import org.apache.hadoop.hbase.namequeues.response.NamedQueueGetResponse;
119import org.apache.hadoop.hbase.net.Address;
120import org.apache.hadoop.hbase.procedure2.RSProcedureCallable;
121import org.apache.hadoop.hbase.quotas.ActivePolicyEnforcement;
122import org.apache.hadoop.hbase.quotas.OperationQuota;
123import org.apache.hadoop.hbase.quotas.QuotaUtil;
124import org.apache.hadoop.hbase.quotas.RegionServerRpcQuotaManager;
125import org.apache.hadoop.hbase.quotas.RegionServerSpaceQuotaManager;
126import org.apache.hadoop.hbase.quotas.SpaceQuotaSnapshot;
127import org.apache.hadoop.hbase.quotas.SpaceViolationPolicyEnforcement;
128import org.apache.hadoop.hbase.regionserver.LeaseManager.Lease;
129import org.apache.hadoop.hbase.regionserver.LeaseManager.LeaseStillHeldException;
130import org.apache.hadoop.hbase.regionserver.Region.Operation;
131import org.apache.hadoop.hbase.regionserver.ScannerContext.LimitScope;
132import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
133import org.apache.hadoop.hbase.regionserver.handler.AssignRegionHandler;
134import org.apache.hadoop.hbase.regionserver.handler.OpenMetaHandler;
135import org.apache.hadoop.hbase.regionserver.handler.OpenPriorityRegionHandler;
136import org.apache.hadoop.hbase.regionserver.handler.OpenRegionHandler;
137import org.apache.hadoop.hbase.regionserver.handler.UnassignRegionHandler;
138import org.apache.hadoop.hbase.security.Superusers;
139import org.apache.hadoop.hbase.security.User;
140import org.apache.hadoop.hbase.security.access.AccessChecker;
141import org.apache.hadoop.hbase.security.access.NoopAccessChecker;
142import org.apache.hadoop.hbase.security.access.Permission;
143import org.apache.hadoop.hbase.security.access.ZKPermissionWatcher;
144import org.apache.hadoop.hbase.util.Bytes;
145import org.apache.hadoop.hbase.util.DNS;
146import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
147import org.apache.hadoop.hbase.util.Pair;
148import org.apache.hadoop.hbase.util.ReservoirSample;
149import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil;
150import org.apache.hadoop.hbase.wal.WAL;
151import org.apache.hadoop.hbase.wal.WALEdit;
152import org.apache.hadoop.hbase.wal.WALKey;
153import org.apache.hadoop.hbase.wal.WALSplitUtil;
154import org.apache.hadoop.hbase.wal.WALSplitUtil.MutationReplay;
155import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
156import org.apache.yetus.audience.InterfaceAudience;
157import org.apache.zookeeper.KeeperException;
158import org.slf4j.Logger;
159import org.slf4j.LoggerFactory;
160
161import org.apache.hbase.thirdparty.com.google.common.cache.Cache;
162import org.apache.hbase.thirdparty.com.google.common.cache.CacheBuilder;
163import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList;
164import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
165import org.apache.hbase.thirdparty.com.google.protobuf.ByteString;
166import org.apache.hbase.thirdparty.com.google.protobuf.Message;
167import org.apache.hbase.thirdparty.com.google.protobuf.RpcController;
168import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
169import org.apache.hbase.thirdparty.com.google.protobuf.TextFormat;
170import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations;
171import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils;
172
173import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
174import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
175import org.apache.hadoop.hbase.shaded.protobuf.ResponseConverter;
176import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService;
177import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearCompactionQueuesRequest;
178import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearCompactionQueuesResponse;
179import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearRegionBlockCacheRequest;
180import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearRegionBlockCacheResponse;
181import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearSlowLogResponseRequest;
182import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearSlowLogResponses;
183import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CloseRegionRequest;
184import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CloseRegionResponse;
185import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactRegionRequest;
186import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactRegionResponse;
187import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactionSwitchRequest;
188import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactionSwitchResponse;
189import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ExecuteProceduresRequest;
190import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ExecuteProceduresResponse;
191import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionRequest;
192import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionResponse;
193import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetOnlineRegionRequest;
194import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetOnlineRegionResponse;
195import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoRequest;
196import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoResponse;
197import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionLoadRequest;
198import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionLoadResponse;
199import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetServerInfoRequest;
200import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetServerInfoResponse;
201import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetStoreFileRequest;
202import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetStoreFileResponse;
203import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionRequest;
204import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionRequest.RegionOpenInfo;
205import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionResponse;
206import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionResponse.RegionOpeningState;
207import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.RemoteProcedureRequest;
208import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ReplicateWALEntryRequest;
209import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ReplicateWALEntryResponse;
210import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.RollWALWriterRequest;
211import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.RollWALWriterResponse;
212import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.SlowLogResponseRequest;
213import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.SlowLogResponses;
214import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.StopServerRequest;
215import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.StopServerResponse;
216import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateConfigurationRequest;
217import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateConfigurationResponse;
218import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateFavoredNodesRequest;
219import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateFavoredNodesResponse;
220import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WALEntry;
221import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WarmupRegionRequest;
222import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WarmupRegionResponse;
223import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
224import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.Action;
225import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.BulkLoadHFileRequest;
226import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.BulkLoadHFileRequest.FamilyPath;
227import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.BulkLoadHFileResponse;
228import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CleanupBulkLoadRequest;
229import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CleanupBulkLoadResponse;
230import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService;
231import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.Condition;
232import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceRequest;
233import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceResponse;
234import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.GetRequest;
235import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.GetResponse;
236import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MultiRegionLoadStats;
237import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MultiRequest;
238import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MultiResponse;
239import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateRequest;
240import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateResponse;
241import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutationProto;
242import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutationProto.MutationType;
243import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.PrepareBulkLoadRequest;
244import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.PrepareBulkLoadResponse;
245import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.RegionAction;
246import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.RegionActionResult;
247import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ResultOrException;
248import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanRequest;
249import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanResponse;
250import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos;
251import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos.RegionLoad;
252import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
253import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.NameBytesPair;
254import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.NameInt64Pair;
255import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier;
256import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType;
257import org.apache.hadoop.hbase.shaded.protobuf.generated.MapReduceProtos.ScanMetrics;
258import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsRequest;
259import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsResponse;
260import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsResponse.TableQuotaSnapshot;
261import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader;
262import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.ClientMetaService;
263import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetActiveMasterRequest;
264import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetActiveMasterResponse;
265import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetBootstrapNodesRequest;
266import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetBootstrapNodesResponse;
267import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetClusterIdRequest;
268import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetClusterIdResponse;
269import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetMastersRequest;
270import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetMastersResponse;
271import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetMastersResponseEntry;
272import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetMetaRegionLocationsRequest;
273import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetMetaRegionLocationsResponse;
274import org.apache.hadoop.hbase.shaded.protobuf.generated.TooSlowLog.SlowLogPayload;
275import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.BulkLoadDescriptor;
276import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.CompactionDescriptor;
277import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.FlushDescriptor;
278import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.RegionEventDescriptor;
279
280/**
281 * Implements the regionserver RPC services.
282 */
283@InterfaceAudience.Private
284@SuppressWarnings("deprecation")
285public class RSRpcServices
286  implements HBaseRPCErrorHandler, AdminService.BlockingInterface, ClientService.BlockingInterface,
287  ClientMetaService.BlockingInterface, PriorityFunction, ConfigurationObserver {
288  protected static final Logger LOG = LoggerFactory.getLogger(RSRpcServices.class);
289
290  /** RPC scheduler to use for the region server. */
291  public static final String REGION_SERVER_RPC_SCHEDULER_FACTORY_CLASS =
292    "hbase.region.server.rpc.scheduler.factory.class";
293
294  /** RPC scheduler to use for the master. */
295  public static final String MASTER_RPC_SCHEDULER_FACTORY_CLASS =
296    "hbase.master.rpc.scheduler.factory.class";
297
298  /**
299   * Minimum allowable time limit delta (in milliseconds) that can be enforced during scans. This
300   * configuration exists to prevent the scenario where a time limit is specified to be so
301   * restrictive that the time limit is reached immediately (before any cells are scanned).
302   */
303  private static final String REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA =
304    "hbase.region.server.rpc.minimum.scan.time.limit.delta";
305  /**
306   * Default value of {@link RSRpcServices#REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA}
307   */
308  static final long DEFAULT_REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA = 10;
309
310  /**
311   * Whether to reject rows with size > threshold defined by
312   * {@link HConstants#BATCH_ROWS_THRESHOLD_NAME}
313   */
314  private static final String REJECT_BATCH_ROWS_OVER_THRESHOLD =
315    "hbase.rpc.rows.size.threshold.reject";
316
317  /**
318   * Default value of config {@link RSRpcServices#REJECT_BATCH_ROWS_OVER_THRESHOLD}
319   */
320  private static final boolean DEFAULT_REJECT_BATCH_ROWS_OVER_THRESHOLD = false;
321
322  public static final String CLIENT_BOOTSTRAP_NODE_LIMIT = "hbase.client.bootstrap.node.limit";
323
324  public static final int DEFAULT_CLIENT_BOOTSTRAP_NODE_LIMIT = 10;
325
326  // Request counter. (Includes requests that are not serviced by regions.)
327  // Count only once for requests with multiple actions like multi/caching-scan/replayBatch
328  final LongAdder requestCount = new LongAdder();
329
330  // Request counter for rpc get
331  final LongAdder rpcGetRequestCount = new LongAdder();
332
333  // Request counter for rpc scan
334  final LongAdder rpcScanRequestCount = new LongAdder();
335
336  // Request counter for scans that might end up in full scans
337  final LongAdder rpcFullScanRequestCount = new LongAdder();
338
339  // Request counter for rpc multi
340  final LongAdder rpcMultiRequestCount = new LongAdder();
341
342  // Request counter for rpc mutate
343  final LongAdder rpcMutateRequestCount = new LongAdder();
344
345  // Server to handle client requests.
346  final RpcServerInterface rpcServer;
347  final InetSocketAddress isa;
348
349  protected final HRegionServer regionServer;
350  private final long maxScannerResultSize;
351
352  // The reference to the priority extraction function
353  private final PriorityFunction priority;
354
355  private ScannerIdGenerator scannerIdGenerator;
356  private final ConcurrentMap<String, RegionScannerHolder> scanners = new ConcurrentHashMap<>();
357  // Hold the name of a closed scanner for a while. This is used to keep compatible for old clients
358  // which may send next or close request to a region scanner which has already been exhausted. The
359  // entries will be removed automatically after scannerLeaseTimeoutPeriod.
360  private final Cache<String, String> closedScanners;
361  /**
362   * The lease timeout period for client scanners (milliseconds).
363   */
364  private final int scannerLeaseTimeoutPeriod;
365
366  /**
367   * The RPC timeout period (milliseconds)
368   */
369  private final int rpcTimeout;
370
371  /**
372   * The minimum allowable delta to use for the scan limit
373   */
374  private final long minimumScanTimeLimitDelta;
375
376  /**
377   * Row size threshold for multi requests above which a warning is logged
378   */
379  private final int rowSizeWarnThreshold;
380  /*
381   * Whether we should reject requests with very high no of rows i.e. beyond threshold defined by
382   * rowSizeWarnThreshold
383   */
384  private final boolean rejectRowsWithSizeOverThreshold;
385
386  final AtomicBoolean clearCompactionQueues = new AtomicBoolean(false);
387
388  private AccessChecker accessChecker;
389  private ZKPermissionWatcher zkPermissionWatcher;
390
391  /**
392   * Services launched in RSRpcServices. By default they are on but you can use the below booleans
393   * to selectively enable/disable either Admin or Client Service (Rare is the case where you would
394   * ever turn off one or the other).
395   */
396  public static final String REGIONSERVER_ADMIN_SERVICE_CONFIG =
397    "hbase.regionserver.admin.executorService";
398  public static final String REGIONSERVER_CLIENT_SERVICE_CONFIG =
399    "hbase.regionserver.client.executorService";
400  public static final String REGIONSERVER_CLIENT_META_SERVICE_CONFIG =
401    "hbase.regionserver.client.meta.executorService";
402
403  /**
404   * An Rpc callback for closing a RegionScanner.
405   */
406  private static final class RegionScannerCloseCallBack implements RpcCallback {
407
408    private final RegionScanner scanner;
409
410    public RegionScannerCloseCallBack(RegionScanner scanner) {
411      this.scanner = scanner;
412    }
413
414    @Override
415    public void run() throws IOException {
416      this.scanner.close();
417    }
418  }
419
420  /**
421   * An Rpc callback for doing shipped() call on a RegionScanner.
422   */
423  private class RegionScannerShippedCallBack implements RpcCallback {
424    private final String scannerName;
425    private final Shipper shipper;
426    private final Lease lease;
427
428    public RegionScannerShippedCallBack(String scannerName, Shipper shipper, Lease lease) {
429      this.scannerName = scannerName;
430      this.shipper = shipper;
431      this.lease = lease;
432    }
433
434    @Override
435    public void run() throws IOException {
436      this.shipper.shipped();
437      // We're done. On way out re-add the above removed lease. The lease was temp removed for this
438      // Rpc call and we are at end of the call now. Time to add it back.
439      if (scanners.containsKey(scannerName)) {
440        if (lease != null) {
441          regionServer.getLeaseManager().addLease(lease);
442        }
443      }
444    }
445  }
446
447  /**
448   * An RpcCallBack that creates a list of scanners that needs to perform callBack operation on
449   * completion of multiGets.
450   */
451  static class RegionScannersCloseCallBack implements RpcCallback {
452    private final List<RegionScanner> scanners = new ArrayList<>();
453
454    public void addScanner(RegionScanner scanner) {
455      this.scanners.add(scanner);
456    }
457
458    @Override
459    public void run() {
460      for (RegionScanner scanner : scanners) {
461        try {
462          scanner.close();
463        } catch (IOException e) {
464          LOG.error("Exception while closing the scanner " + scanner, e);
465        }
466      }
467    }
468  }
469
470  /**
471   * Holder class which holds the RegionScanner, nextCallSeq and RpcCallbacks together.
472   */
473  static final class RegionScannerHolder {
474    private final AtomicLong nextCallSeq = new AtomicLong(0);
475    private final RegionScanner s;
476    private final HRegion r;
477    private final RpcCallback closeCallBack;
478    private final RpcCallback shippedCallback;
479    private byte[] rowOfLastPartialResult;
480    private boolean needCursor;
481    private boolean fullRegionScan;
482    private final String clientIPAndPort;
483    private final String userName;
484
485    RegionScannerHolder(RegionScanner s, HRegion r, RpcCallback closeCallBack,
486      RpcCallback shippedCallback, boolean needCursor, boolean fullRegionScan,
487      String clientIPAndPort, String userName) {
488      this.s = s;
489      this.r = r;
490      this.closeCallBack = closeCallBack;
491      this.shippedCallback = shippedCallback;
492      this.needCursor = needCursor;
493      this.fullRegionScan = fullRegionScan;
494      this.clientIPAndPort = clientIPAndPort;
495      this.userName = userName;
496    }
497
498    long getNextCallSeq() {
499      return nextCallSeq.get();
500    }
501
502    boolean incNextCallSeq(long currentSeq) {
503      // Use CAS to prevent multiple scan request running on the same scanner.
504      return nextCallSeq.compareAndSet(currentSeq, currentSeq + 1);
505    }
506
507    // Should be called only when we need to print lease expired messages otherwise
508    // cache the String once made.
509    @Override
510    public String toString() {
511      return "clientIPAndPort=" + this.clientIPAndPort + ", userName=" + this.userName
512        + ", regionInfo=" + this.r.getRegionInfo().getRegionNameAsString();
513    }
514  }
515
516  /**
517   * Instantiated as a scanner lease. If the lease times out, the scanner is closed
518   */
519  private class ScannerListener implements LeaseListener {
520    private final String scannerName;
521
522    ScannerListener(final String n) {
523      this.scannerName = n;
524    }
525
526    @Override
527    public void leaseExpired() {
528      RegionScannerHolder rsh = scanners.remove(this.scannerName);
529      if (rsh == null) {
530        LOG.warn("Scanner lease {} expired but no outstanding scanner", this.scannerName);
531        return;
532      }
533      LOG.info("Scanner lease {} expired {}", this.scannerName, rsh);
534      regionServer.getMetrics().incrScannerLeaseExpired();
535      RegionScanner s = rsh.s;
536      HRegion region = null;
537      try {
538        region = regionServer.getRegion(s.getRegionInfo().getRegionName());
539        if (region != null && region.getCoprocessorHost() != null) {
540          region.getCoprocessorHost().preScannerClose(s);
541        }
542      } catch (IOException e) {
543        LOG.error("Closing scanner {} {}", this.scannerName, rsh, e);
544      } finally {
545        try {
546          s.close();
547          if (region != null && region.getCoprocessorHost() != null) {
548            region.getCoprocessorHost().postScannerClose(s);
549          }
550        } catch (IOException e) {
551          LOG.error("Closing scanner {} {}", this.scannerName, rsh, e);
552        }
553      }
554    }
555  }
556
557  private static ResultOrException getResultOrException(final ClientProtos.Result r,
558    final int index) {
559    return getResultOrException(ResponseConverter.buildActionResult(r), index);
560  }
561
562  private static ResultOrException getResultOrException(final Exception e, final int index) {
563    return getResultOrException(ResponseConverter.buildActionResult(e), index);
564  }
565
566  private static ResultOrException getResultOrException(final ResultOrException.Builder builder,
567    final int index) {
568    return builder.setIndex(index).build();
569  }
570
571  /**
572   * Checks for the following pre-checks in order:
573   * <ol>
574   * <li>RegionServer is running</li>
575   * <li>If authorization is enabled, then RPC caller has ADMIN permissions</li>
576   * </ol>
577   * @param requestName name of rpc request. Used in reporting failures to provide context.
578   * @throws ServiceException If any of the above listed pre-check fails.
579   */
580  private void rpcPreCheck(String requestName) throws ServiceException {
581    try {
582      checkOpen();
583      requirePermission(requestName, Permission.Action.ADMIN);
584    } catch (IOException ioe) {
585      throw new ServiceException(ioe);
586    }
587  }
588
589  private boolean isClientCellBlockSupport(RpcCallContext context) {
590    return context != null && context.isClientCellBlockSupported();
591  }
592
593  private void addResult(final MutateResponse.Builder builder, final Result result,
594    final HBaseRpcController rpcc, boolean clientCellBlockSupported) {
595    if (result == null) return;
596    if (clientCellBlockSupported) {
597      builder.setResult(ProtobufUtil.toResultNoData(result));
598      rpcc.setCellScanner(result.cellScanner());
599    } else {
600      ClientProtos.Result pbr = ProtobufUtil.toResult(result);
601      builder.setResult(pbr);
602    }
603  }
604
605  private void addResults(ScanResponse.Builder builder, List<Result> results,
606    HBaseRpcController controller, boolean isDefaultRegion, boolean clientCellBlockSupported) {
607    builder.setStale(!isDefaultRegion);
608    if (results.isEmpty()) {
609      return;
610    }
611    if (clientCellBlockSupported) {
612      for (Result res : results) {
613        builder.addCellsPerResult(res.size());
614        builder.addPartialFlagPerResult(res.mayHaveMoreCellsInRow());
615      }
616      controller.setCellScanner(CellUtil.createCellScanner(results));
617    } else {
618      for (Result res : results) {
619        ClientProtos.Result pbr = ProtobufUtil.toResult(res);
620        builder.addResults(pbr);
621      }
622    }
623  }
624
625  private CheckAndMutateResult checkAndMutate(HRegion region, List<ClientProtos.Action> actions,
626    CellScanner cellScanner, Condition condition, long nonceGroup,
627    ActivePolicyEnforcement spaceQuotaEnforcement) throws IOException {
628    int countOfCompleteMutation = 0;
629    try {
630      if (!region.getRegionInfo().isMetaRegion()) {
631        regionServer.getMemStoreFlusher().reclaimMemStoreMemory();
632      }
633      List<Mutation> mutations = new ArrayList<>();
634      long nonce = HConstants.NO_NONCE;
635      for (ClientProtos.Action action : actions) {
636        if (action.hasGet()) {
637          throw new DoNotRetryIOException(
638            "Atomic put and/or delete only, not a Get=" + action.getGet());
639        }
640        MutationProto mutation = action.getMutation();
641        MutationType type = mutation.getMutateType();
642        switch (type) {
643          case PUT:
644            Put put = ProtobufUtil.toPut(mutation, cellScanner);
645            ++countOfCompleteMutation;
646            checkCellSizeLimit(region, put);
647            spaceQuotaEnforcement.getPolicyEnforcement(region).check(put);
648            mutations.add(put);
649            break;
650          case DELETE:
651            Delete del = ProtobufUtil.toDelete(mutation, cellScanner);
652            ++countOfCompleteMutation;
653            spaceQuotaEnforcement.getPolicyEnforcement(region).check(del);
654            mutations.add(del);
655            break;
656          case INCREMENT:
657            Increment increment = ProtobufUtil.toIncrement(mutation, cellScanner);
658            nonce = mutation.hasNonce() ? mutation.getNonce() : HConstants.NO_NONCE;
659            ++countOfCompleteMutation;
660            checkCellSizeLimit(region, increment);
661            spaceQuotaEnforcement.getPolicyEnforcement(region).check(increment);
662            mutations.add(increment);
663            break;
664          case APPEND:
665            Append append = ProtobufUtil.toAppend(mutation, cellScanner);
666            nonce = mutation.hasNonce() ? mutation.getNonce() : HConstants.NO_NONCE;
667            ++countOfCompleteMutation;
668            checkCellSizeLimit(region, append);
669            spaceQuotaEnforcement.getPolicyEnforcement(region).check(append);
670            mutations.add(append);
671            break;
672          default:
673            throw new DoNotRetryIOException("invalid mutation type : " + type);
674        }
675      }
676
677      if (mutations.size() == 0) {
678        return new CheckAndMutateResult(true, null);
679      } else {
680        CheckAndMutate checkAndMutate = ProtobufUtil.toCheckAndMutate(condition, mutations);
681        CheckAndMutateResult result = null;
682        if (region.getCoprocessorHost() != null) {
683          result = region.getCoprocessorHost().preCheckAndMutate(checkAndMutate);
684        }
685        if (result == null) {
686          result = region.checkAndMutate(checkAndMutate, nonceGroup, nonce);
687          if (region.getCoprocessorHost() != null) {
688            result = region.getCoprocessorHost().postCheckAndMutate(checkAndMutate, result);
689          }
690        }
691        return result;
692      }
693    } finally {
694      // Currently, the checkAndMutate isn't supported by batch so it won't mess up the cell scanner
695      // even if the malformed cells are not skipped.
696      for (int i = countOfCompleteMutation; i < actions.size(); ++i) {
697        skipCellsForMutation(actions.get(i), cellScanner);
698      }
699    }
700  }
701
702  /**
703   * Execute an append mutation.
704   * @return result to return to client if default operation should be bypassed as indicated by
705   *         RegionObserver, null otherwise
706   */
707  private Result append(final HRegion region, final OperationQuota quota,
708    final MutationProto mutation, final CellScanner cellScanner, long nonceGroup,
709    ActivePolicyEnforcement spaceQuota) throws IOException {
710    long before = EnvironmentEdgeManager.currentTime();
711    Append append = ProtobufUtil.toAppend(mutation, cellScanner);
712    checkCellSizeLimit(region, append);
713    spaceQuota.getPolicyEnforcement(region).check(append);
714    quota.addMutation(append);
715    long nonce = mutation.hasNonce() ? mutation.getNonce() : HConstants.NO_NONCE;
716    Result r = region.append(append, nonceGroup, nonce);
717    if (regionServer.getMetrics() != null) {
718      regionServer.getMetrics().updateAppend(region.getTableDescriptor().getTableName(),
719        EnvironmentEdgeManager.currentTime() - before);
720    }
721    return r == null ? Result.EMPTY_RESULT : r;
722  }
723
724  /**
725   * Execute an increment mutation.
726   */
727  private Result increment(final HRegion region, final OperationQuota quota,
728    final MutationProto mutation, final CellScanner cells, long nonceGroup,
729    ActivePolicyEnforcement spaceQuota) throws IOException {
730    long before = EnvironmentEdgeManager.currentTime();
731    Increment increment = ProtobufUtil.toIncrement(mutation, cells);
732    checkCellSizeLimit(region, increment);
733    spaceQuota.getPolicyEnforcement(region).check(increment);
734    quota.addMutation(increment);
735    long nonce = mutation.hasNonce() ? mutation.getNonce() : HConstants.NO_NONCE;
736    Result r = region.increment(increment, nonceGroup, nonce);
737    final MetricsRegionServer metricsRegionServer = regionServer.getMetrics();
738    if (metricsRegionServer != null) {
739      metricsRegionServer.updateIncrement(region.getTableDescriptor().getTableName(),
740        EnvironmentEdgeManager.currentTime() - before);
741    }
742    return r == null ? Result.EMPTY_RESULT : r;
743  }
744
745  /**
746   * Run through the regionMutation <code>rm</code> and per Mutation, do the work, and then when
747   * done, add an instance of a {@link ResultOrException} that corresponds to each Mutation.
748   * @param cellsToReturn Could be null. May be allocated in this method. This is what this method
749   *                      returns as a 'result'.
750   * @param closeCallBack the callback to be used with multigets
751   * @param context       the current RpcCallContext
752   * @return Return the <code>cellScanner</code> passed
753   */
754  private List<CellScannable> doNonAtomicRegionMutation(final HRegion region,
755    final OperationQuota quota, final RegionAction actions, final CellScanner cellScanner,
756    final RegionActionResult.Builder builder, List<CellScannable> cellsToReturn, long nonceGroup,
757    final RegionScannersCloseCallBack closeCallBack, RpcCallContext context,
758    ActivePolicyEnforcement spaceQuotaEnforcement) {
759    // Gather up CONTIGUOUS Puts and Deletes in this mutations List. Idea is that rather than do
760    // one at a time, we instead pass them in batch. Be aware that the corresponding
761    // ResultOrException instance that matches each Put or Delete is then added down in the
762    // doNonAtomicBatchOp call. We should be staying aligned though the Put and Delete are
763    // deferred/batched
764    List<ClientProtos.Action> mutations = null;
765    long maxQuotaResultSize = Math.min(maxScannerResultSize, quota.getReadAvailable());
766    IOException sizeIOE = null;
767    Object lastBlock = null;
768    ClientProtos.ResultOrException.Builder resultOrExceptionBuilder =
769      ResultOrException.newBuilder();
770    boolean hasResultOrException = false;
771    for (ClientProtos.Action action : actions.getActionList()) {
772      hasResultOrException = false;
773      resultOrExceptionBuilder.clear();
774      try {
775        Result r = null;
776
777        if (
778          context != null && context.isRetryImmediatelySupported()
779            && (context.getResponseCellSize() > maxQuotaResultSize
780              || context.getResponseBlockSize() + context.getResponseExceptionSize()
781                  > maxQuotaResultSize)
782        ) {
783
784          // We're storing the exception since the exception and reason string won't
785          // change after the response size limit is reached.
786          if (sizeIOE == null) {
787            // We don't need the stack un-winding do don't throw the exception.
788            // Throwing will kill the JVM's JIT.
789            //
790            // Instead just create the exception and then store it.
791            sizeIOE = new MultiActionResultTooLarge("Max size exceeded" + " CellSize: "
792              + context.getResponseCellSize() + " BlockSize: " + context.getResponseBlockSize());
793
794            // Only report the exception once since there's only one request that
795            // caused the exception. Otherwise this number will dominate the exceptions count.
796            rpcServer.getMetrics().exception(sizeIOE);
797          }
798
799          // Now that there's an exception is known to be created
800          // use it for the response.
801          //
802          // This will create a copy in the builder.
803          NameBytesPair pair = ResponseConverter.buildException(sizeIOE);
804          resultOrExceptionBuilder.setException(pair);
805          context.incrementResponseExceptionSize(pair.getSerializedSize());
806          resultOrExceptionBuilder.setIndex(action.getIndex());
807          builder.addResultOrException(resultOrExceptionBuilder.build());
808          skipCellsForMutation(action, cellScanner);
809          continue;
810        }
811        if (action.hasGet()) {
812          long before = EnvironmentEdgeManager.currentTime();
813          ClientProtos.Get pbGet = action.getGet();
814          // An asynchbase client, https://github.com/OpenTSDB/asynchbase, starts by trying to do
815          // a get closest before. Throwing the UnknownProtocolException signals it that it needs
816          // to switch and do hbase2 protocol (HBase servers do not tell clients what versions
817          // they are; its a problem for non-native clients like asynchbase. HBASE-20225.
818          if (pbGet.hasClosestRowBefore() && pbGet.getClosestRowBefore()) {
819            throw new UnknownProtocolException("Is this a pre-hbase-1.0.0 or asynchbase client? "
820              + "Client is invoking getClosestRowBefore removed in hbase-2.0.0 replaced by "
821              + "reverse Scan.");
822          }
823          try {
824            Get get = ProtobufUtil.toGet(pbGet);
825            if (context != null) {
826              r = get(get, (region), closeCallBack, context);
827            } else {
828              r = region.get(get);
829            }
830          } finally {
831            final MetricsRegionServer metricsRegionServer = regionServer.getMetrics();
832            if (metricsRegionServer != null) {
833              metricsRegionServer.updateGet(region.getTableDescriptor().getTableName(),
834                EnvironmentEdgeManager.currentTime() - before);
835            }
836          }
837        } else if (action.hasServiceCall()) {
838          hasResultOrException = true;
839          com.google.protobuf.Message result = execServiceOnRegion(region, action.getServiceCall());
840          ClientProtos.CoprocessorServiceResult.Builder serviceResultBuilder =
841            ClientProtos.CoprocessorServiceResult.newBuilder();
842          resultOrExceptionBuilder.setServiceResult(serviceResultBuilder
843            .setValue(serviceResultBuilder.getValueBuilder().setName(result.getClass().getName())
844              // TODO: Copy!!!
845              .setValue(UnsafeByteOperations.unsafeWrap(result.toByteArray()))));
846        } else if (action.hasMutation()) {
847          MutationType type = action.getMutation().getMutateType();
848          if (
849            type != MutationType.PUT && type != MutationType.DELETE && mutations != null
850              && !mutations.isEmpty()
851          ) {
852            // Flush out any Puts or Deletes already collected.
853            doNonAtomicBatchOp(builder, region, quota, mutations, cellScanner,
854              spaceQuotaEnforcement);
855            mutations.clear();
856          }
857          switch (type) {
858            case APPEND:
859              r = append(region, quota, action.getMutation(), cellScanner, nonceGroup,
860                spaceQuotaEnforcement);
861              break;
862            case INCREMENT:
863              r = increment(region, quota, action.getMutation(), cellScanner, nonceGroup,
864                spaceQuotaEnforcement);
865              break;
866            case PUT:
867            case DELETE:
868              // Collect the individual mutations and apply in a batch
869              if (mutations == null) {
870                mutations = new ArrayList<>(actions.getActionCount());
871              }
872              mutations.add(action);
873              break;
874            default:
875              throw new DoNotRetryIOException("Unsupported mutate type: " + type.name());
876          }
877        } else {
878          throw new HBaseIOException("Unexpected Action type");
879        }
880        if (r != null) {
881          ClientProtos.Result pbResult = null;
882          if (isClientCellBlockSupport(context)) {
883            pbResult = ProtobufUtil.toResultNoData(r);
884            // Hard to guess the size here. Just make a rough guess.
885            if (cellsToReturn == null) {
886              cellsToReturn = new ArrayList<>();
887            }
888            cellsToReturn.add(r);
889          } else {
890            pbResult = ProtobufUtil.toResult(r);
891          }
892          lastBlock = addSize(context, r, lastBlock);
893          hasResultOrException = true;
894          resultOrExceptionBuilder.setResult(pbResult);
895        }
896        // Could get to here and there was no result and no exception. Presumes we added
897        // a Put or Delete to the collecting Mutations List for adding later. In this
898        // case the corresponding ResultOrException instance for the Put or Delete will be added
899        // down in the doNonAtomicBatchOp method call rather than up here.
900      } catch (IOException ie) {
901        rpcServer.getMetrics().exception(ie);
902        hasResultOrException = true;
903        NameBytesPair pair = ResponseConverter.buildException(ie);
904        resultOrExceptionBuilder.setException(pair);
905        context.incrementResponseExceptionSize(pair.getSerializedSize());
906      }
907      if (hasResultOrException) {
908        // Propagate index.
909        resultOrExceptionBuilder.setIndex(action.getIndex());
910        builder.addResultOrException(resultOrExceptionBuilder.build());
911      }
912    }
913    // Finish up any outstanding mutations
914    if (!CollectionUtils.isEmpty(mutations)) {
915      doNonAtomicBatchOp(builder, region, quota, mutations, cellScanner, spaceQuotaEnforcement);
916    }
917    return cellsToReturn;
918  }
919
920  private void checkCellSizeLimit(final HRegion r, final Mutation m) throws IOException {
921    if (r.maxCellSize > 0) {
922      CellScanner cells = m.cellScanner();
923      while (cells.advance()) {
924        int size = PrivateCellUtil.estimatedSerializedSizeOf(cells.current());
925        if (size > r.maxCellSize) {
926          String msg = "Cell[" + cells.current() + "] with size " + size + " exceeds limit of "
927            + r.maxCellSize + " bytes";
928          LOG.debug(msg);
929          throw new DoNotRetryIOException(msg);
930        }
931      }
932    }
933  }
934
935  private void doAtomicBatchOp(final RegionActionResult.Builder builder, final HRegion region,
936    final OperationQuota quota, final List<ClientProtos.Action> mutations, final CellScanner cells,
937    long nonceGroup, ActivePolicyEnforcement spaceQuotaEnforcement) throws IOException {
938    // Just throw the exception. The exception will be caught and then added to region-level
939    // exception for RegionAction. Leaving the null to action result is ok since the null
940    // result is viewed as failure by hbase client. And the region-lever exception will be used
941    // to replaced the null result. see AsyncRequestFutureImpl#receiveMultiAction and
942    // AsyncBatchRpcRetryingCaller#onComplete for more details.
943    doBatchOp(builder, region, quota, mutations, cells, nonceGroup, spaceQuotaEnforcement, true);
944  }
945
946  private void doNonAtomicBatchOp(final RegionActionResult.Builder builder, final HRegion region,
947    final OperationQuota quota, final List<ClientProtos.Action> mutations, final CellScanner cells,
948    ActivePolicyEnforcement spaceQuotaEnforcement) {
949    try {
950      doBatchOp(builder, region, quota, mutations, cells, HConstants.NO_NONCE,
951        spaceQuotaEnforcement, false);
952    } catch (IOException e) {
953      // Set the exception for each action. The mutations in same RegionAction are group to
954      // different batch and then be processed individually. Hence, we don't set the region-level
955      // exception here for whole RegionAction.
956      for (Action mutation : mutations) {
957        builder.addResultOrException(getResultOrException(e, mutation.getIndex()));
958      }
959    }
960  }
961
962  /**
963   * Execute a list of mutations. nnn
964   */
965  private void doBatchOp(final RegionActionResult.Builder builder, final HRegion region,
966    final OperationQuota quota, final List<ClientProtos.Action> mutations, final CellScanner cells,
967    long nonceGroup, ActivePolicyEnforcement spaceQuotaEnforcement, boolean atomic)
968    throws IOException {
969    Mutation[] mArray = new Mutation[mutations.size()];
970    long before = EnvironmentEdgeManager.currentTime();
971    boolean batchContainsPuts = false, batchContainsDelete = false;
972    try {
973      /**
974       * HBASE-17924 mutationActionMap is a map to map the relation between mutations and actions
975       * since mutation array may have been reoredered.In order to return the right result or
976       * exception to the corresponding actions, We need to know which action is the mutation belong
977       * to. We can't sort ClientProtos.Action array, since they are bonded to cellscanners.
978       */
979      Map<Mutation, ClientProtos.Action> mutationActionMap = new HashMap<>();
980      int i = 0;
981      long nonce = HConstants.NO_NONCE;
982      for (ClientProtos.Action action : mutations) {
983        if (action.hasGet()) {
984          throw new DoNotRetryIOException(
985            "Atomic put and/or delete only, not a Get=" + action.getGet());
986        }
987        MutationProto m = action.getMutation();
988        Mutation mutation;
989        switch (m.getMutateType()) {
990          case PUT:
991            mutation = ProtobufUtil.toPut(m, cells);
992            batchContainsPuts = true;
993            break;
994
995          case DELETE:
996            mutation = ProtobufUtil.toDelete(m, cells);
997            batchContainsDelete = true;
998            break;
999
1000          case INCREMENT:
1001            mutation = ProtobufUtil.toIncrement(m, cells);
1002            nonce = m.hasNonce() ? m.getNonce() : HConstants.NO_NONCE;
1003            break;
1004
1005          case APPEND:
1006            mutation = ProtobufUtil.toAppend(m, cells);
1007            nonce = m.hasNonce() ? m.getNonce() : HConstants.NO_NONCE;
1008            break;
1009
1010          default:
1011            throw new DoNotRetryIOException("Invalid mutation type : " + m.getMutateType());
1012        }
1013        mutationActionMap.put(mutation, action);
1014        mArray[i++] = mutation;
1015        checkCellSizeLimit(region, mutation);
1016        // Check if a space quota disallows this mutation
1017        spaceQuotaEnforcement.getPolicyEnforcement(region).check(mutation);
1018        quota.addMutation(mutation);
1019      }
1020
1021      if (!region.getRegionInfo().isMetaRegion()) {
1022        regionServer.getMemStoreFlusher().reclaimMemStoreMemory();
1023      }
1024
1025      // HBASE-17924
1026      // Sort to improve lock efficiency for non-atomic batch of operations. If atomic
1027      // order is preserved as its expected from the client
1028      if (!atomic) {
1029        Arrays.sort(mArray, (v1, v2) -> Row.COMPARATOR.compare(v1, v2));
1030      }
1031
1032      OperationStatus[] codes = region.batchMutate(mArray, atomic, nonceGroup, nonce);
1033
1034      // When atomic is true, it indicates that the mutateRow API or the batch API with
1035      // RowMutations is called. In this case, we need to merge the results of the
1036      // Increment/Append operations if the mutations include those operations, and set the merged
1037      // result to the first element of the ResultOrException list
1038      if (atomic) {
1039        List<ResultOrException> resultOrExceptions = new ArrayList<>();
1040        List<Result> results = new ArrayList<>();
1041        for (i = 0; i < codes.length; i++) {
1042          if (codes[i].getResult() != null) {
1043            results.add(codes[i].getResult());
1044          }
1045          if (i != 0) {
1046            resultOrExceptions
1047              .add(getResultOrException(ClientProtos.Result.getDefaultInstance(), i));
1048          }
1049        }
1050
1051        if (results.isEmpty()) {
1052          builder.addResultOrException(
1053            getResultOrException(ClientProtos.Result.getDefaultInstance(), 0));
1054        } else {
1055          // Merge the results of the Increment/Append operations
1056          List<Cell> cellList = new ArrayList<>();
1057          for (Result result : results) {
1058            if (result.rawCells() != null) {
1059              cellList.addAll(Arrays.asList(result.rawCells()));
1060            }
1061          }
1062          Result result = Result.create(cellList);
1063
1064          // Set the merged result of the Increment/Append operations to the first element of the
1065          // ResultOrException list
1066          builder.addResultOrException(getResultOrException(ProtobufUtil.toResult(result), 0));
1067        }
1068
1069        builder.addAllResultOrException(resultOrExceptions);
1070        return;
1071      }
1072
1073      for (i = 0; i < codes.length; i++) {
1074        Mutation currentMutation = mArray[i];
1075        ClientProtos.Action currentAction = mutationActionMap.get(currentMutation);
1076        int index = currentAction.hasIndex() ? currentAction.getIndex() : i;
1077        Exception e;
1078        switch (codes[i].getOperationStatusCode()) {
1079          case BAD_FAMILY:
1080            e = new NoSuchColumnFamilyException(codes[i].getExceptionMsg());
1081            builder.addResultOrException(getResultOrException(e, index));
1082            break;
1083
1084          case SANITY_CHECK_FAILURE:
1085            e = new FailedSanityCheckException(codes[i].getExceptionMsg());
1086            builder.addResultOrException(getResultOrException(e, index));
1087            break;
1088
1089          default:
1090            e = new DoNotRetryIOException(codes[i].getExceptionMsg());
1091            builder.addResultOrException(getResultOrException(e, index));
1092            break;
1093
1094          case SUCCESS:
1095            builder.addResultOrException(
1096              getResultOrException(ClientProtos.Result.getDefaultInstance(), index));
1097            break;
1098
1099          case STORE_TOO_BUSY:
1100            e = new RegionTooBusyException(codes[i].getExceptionMsg());
1101            builder.addResultOrException(getResultOrException(e, index));
1102            break;
1103        }
1104      }
1105    } finally {
1106      int processedMutationIndex = 0;
1107      for (Action mutation : mutations) {
1108        // The non-null mArray[i] means the cell scanner has been read.
1109        if (mArray[processedMutationIndex++] == null) {
1110          skipCellsForMutation(mutation, cells);
1111        }
1112      }
1113      updateMutationMetrics(region, before, batchContainsPuts, batchContainsDelete);
1114    }
1115  }
1116
1117  private void updateMutationMetrics(HRegion region, long starttime, boolean batchContainsPuts,
1118    boolean batchContainsDelete) {
1119    final MetricsRegionServer metricsRegionServer = regionServer.getMetrics();
1120    if (metricsRegionServer != null) {
1121      long after = EnvironmentEdgeManager.currentTime();
1122      if (batchContainsPuts) {
1123        metricsRegionServer.updatePutBatch(region.getTableDescriptor().getTableName(),
1124          after - starttime);
1125      }
1126      if (batchContainsDelete) {
1127        metricsRegionServer.updateDeleteBatch(region.getTableDescriptor().getTableName(),
1128          after - starttime);
1129      }
1130    }
1131  }
1132
1133  /**
1134   * Execute a list of Put/Delete mutations. The function returns OperationStatus instead of
1135   * constructing MultiResponse to save a possible loop if caller doesn't need MultiResponse. nnn
1136   * * @return an array of OperationStatus which internally contains the OperationStatusCode and the
1137   * exceptionMessage if any n
1138   */
1139  private OperationStatus[] doReplayBatchOp(final HRegion region,
1140    final List<MutationReplay> mutations, long replaySeqId) throws IOException {
1141    long before = EnvironmentEdgeManager.currentTime();
1142    boolean batchContainsPuts = false, batchContainsDelete = false;
1143    try {
1144      for (Iterator<MutationReplay> it = mutations.iterator(); it.hasNext();) {
1145        MutationReplay m = it.next();
1146
1147        if (m.getType() == MutationType.PUT) {
1148          batchContainsPuts = true;
1149        } else {
1150          batchContainsDelete = true;
1151        }
1152
1153        NavigableMap<byte[], List<Cell>> map = m.mutation.getFamilyCellMap();
1154        List<Cell> metaCells = map.get(WALEdit.METAFAMILY);
1155        if (metaCells != null && !metaCells.isEmpty()) {
1156          for (Cell metaCell : metaCells) {
1157            CompactionDescriptor compactionDesc = WALEdit.getCompaction(metaCell);
1158            boolean isDefaultReplica = RegionReplicaUtil.isDefaultReplica(region.getRegionInfo());
1159            HRegion hRegion = region;
1160            if (compactionDesc != null) {
1161              // replay the compaction. Remove the files from stores only if we are the primary
1162              // region replica (thus own the files)
1163              hRegion.replayWALCompactionMarker(compactionDesc, !isDefaultReplica, isDefaultReplica,
1164                replaySeqId);
1165              continue;
1166            }
1167            FlushDescriptor flushDesc = WALEdit.getFlushDescriptor(metaCell);
1168            if (flushDesc != null && !isDefaultReplica) {
1169              hRegion.replayWALFlushMarker(flushDesc, replaySeqId);
1170              continue;
1171            }
1172            RegionEventDescriptor regionEvent = WALEdit.getRegionEventDescriptor(metaCell);
1173            if (regionEvent != null && !isDefaultReplica) {
1174              hRegion.replayWALRegionEventMarker(regionEvent);
1175              continue;
1176            }
1177            BulkLoadDescriptor bulkLoadEvent = WALEdit.getBulkLoadDescriptor(metaCell);
1178            if (bulkLoadEvent != null) {
1179              hRegion.replayWALBulkLoadEventMarker(bulkLoadEvent);
1180              continue;
1181            }
1182          }
1183          it.remove();
1184        }
1185      }
1186      requestCount.increment();
1187      if (!region.getRegionInfo().isMetaRegion()) {
1188        regionServer.getMemStoreFlusher().reclaimMemStoreMemory();
1189      }
1190      return region.batchReplay(mutations.toArray(new MutationReplay[mutations.size()]),
1191        replaySeqId);
1192    } finally {
1193      updateMutationMetrics(region, before, batchContainsPuts, batchContainsDelete);
1194    }
1195  }
1196
1197  private void closeAllScanners() {
1198    // Close any outstanding scanners. Means they'll get an UnknownScanner
1199    // exception next time they come in.
1200    for (Map.Entry<String, RegionScannerHolder> e : scanners.entrySet()) {
1201      try {
1202        e.getValue().s.close();
1203      } catch (IOException ioe) {
1204        LOG.warn("Closing scanner " + e.getKey(), ioe);
1205      }
1206    }
1207  }
1208
1209  // Directly invoked only for testing
1210  public RSRpcServices(final HRegionServer rs) throws IOException {
1211    final Configuration conf = rs.getConfiguration();
1212    regionServer = rs;
1213    rowSizeWarnThreshold =
1214      conf.getInt(HConstants.BATCH_ROWS_THRESHOLD_NAME, HConstants.BATCH_ROWS_THRESHOLD_DEFAULT);
1215    rejectRowsWithSizeOverThreshold =
1216      conf.getBoolean(REJECT_BATCH_ROWS_OVER_THRESHOLD, DEFAULT_REJECT_BATCH_ROWS_OVER_THRESHOLD);
1217
1218    final RpcSchedulerFactory rpcSchedulerFactory;
1219    try {
1220      rpcSchedulerFactory = getRpcSchedulerFactoryClass().asSubclass(RpcSchedulerFactory.class)
1221        .getDeclaredConstructor().newInstance();
1222    } catch (NoSuchMethodException | InvocationTargetException | InstantiationException
1223      | IllegalAccessException e) {
1224      throw new IllegalArgumentException(e);
1225    }
1226    // Server to handle client requests.
1227    final InetSocketAddress initialIsa;
1228    final InetSocketAddress bindAddress;
1229    if (this instanceof MasterRpcServices) {
1230      String hostname = DNS.getHostname(conf, DNS.ServerType.MASTER);
1231      int port = conf.getInt(HConstants.MASTER_PORT, HConstants.DEFAULT_MASTER_PORT);
1232      // Creation of a HSA will force a resolve.
1233      initialIsa = new InetSocketAddress(hostname, port);
1234      bindAddress = new InetSocketAddress(conf.get("hbase.master.ipc.address", hostname), port);
1235    } else {
1236      String hostname = DNS.getHostname(conf, DNS.ServerType.REGIONSERVER);
1237      int port = conf.getInt(HConstants.REGIONSERVER_PORT, HConstants.DEFAULT_REGIONSERVER_PORT);
1238      // Creation of a HSA will force a resolve.
1239      initialIsa = new InetSocketAddress(hostname, port);
1240      bindAddress =
1241        new InetSocketAddress(conf.get("hbase.regionserver.ipc.address", hostname), port);
1242    }
1243    if (initialIsa.getAddress() == null) {
1244      throw new IllegalArgumentException("Failed resolve of " + initialIsa);
1245    }
1246    priority = createPriority();
1247    // Using Address means we don't get the IP too. Shorten it more even to just the host name
1248    // w/o the domain.
1249    final String name = rs.getProcessName() + "/"
1250      + Address.fromParts(initialIsa.getHostName(), initialIsa.getPort()).toStringWithoutDomain();
1251    // Set how many times to retry talking to another server over Connection.
1252    ConnectionUtils.setServerSideHConnectionRetriesConfig(conf, name, LOG);
1253    rpcServer = createRpcServer(rs, rpcSchedulerFactory, bindAddress, name);
1254    rpcServer.setRsRpcServices(this);
1255    if (!(rs instanceof HMaster)) {
1256      rpcServer.setNamedQueueRecorder(rs.getNamedQueueRecorder());
1257    }
1258    scannerLeaseTimeoutPeriod = conf.getInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD,
1259      HConstants.DEFAULT_HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD);
1260    maxScannerResultSize = conf.getLong(HConstants.HBASE_SERVER_SCANNER_MAX_RESULT_SIZE_KEY,
1261      HConstants.DEFAULT_HBASE_SERVER_SCANNER_MAX_RESULT_SIZE);
1262    rpcTimeout =
1263      conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY, HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
1264    minimumScanTimeLimitDelta = conf.getLong(REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA,
1265      DEFAULT_REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA);
1266
1267    final InetSocketAddress address = rpcServer.getListenerAddress();
1268    if (address == null) {
1269      throw new IOException("Listener channel is closed");
1270    }
1271    // Set our address, however we need the final port that was given to rpcServer
1272    isa = new InetSocketAddress(initialIsa.getHostName(), address.getPort());
1273    rpcServer.setErrorHandler(this);
1274    rs.setName(name);
1275
1276    closedScanners = CacheBuilder.newBuilder()
1277      .expireAfterAccess(scannerLeaseTimeoutPeriod, TimeUnit.MILLISECONDS).build();
1278  }
1279
1280  protected RpcServerInterface createRpcServer(final Server server,
1281    final RpcSchedulerFactory rpcSchedulerFactory, final InetSocketAddress bindAddress,
1282    final String name) throws IOException {
1283    final Configuration conf = server.getConfiguration();
1284    boolean reservoirEnabled = conf.getBoolean(ByteBuffAllocator.ALLOCATOR_POOL_ENABLED_KEY, true);
1285    try {
1286      return RpcServerFactory.createRpcServer(server, name, getServices(), bindAddress, // use final
1287                                                                                        // bindAddress
1288                                                                                        // for this
1289                                                                                        // server.
1290        conf, rpcSchedulerFactory.create(conf, this, server), reservoirEnabled);
1291    } catch (BindException be) {
1292      throw new IOException(be.getMessage() + ". To switch ports use the '"
1293        + HConstants.REGIONSERVER_PORT + "' configuration property.",
1294        be.getCause() != null ? be.getCause() : be);
1295    }
1296  }
1297
1298  protected Class<?> getRpcSchedulerFactoryClass() {
1299    final Configuration conf = regionServer.getConfiguration();
1300    return conf.getClass(REGION_SERVER_RPC_SCHEDULER_FACTORY_CLASS,
1301      SimpleRpcSchedulerFactory.class);
1302  }
1303
1304  @Override
1305  public void onConfigurationChange(Configuration newConf) {
1306    if (rpcServer instanceof ConfigurationObserver) {
1307      ((ConfigurationObserver) rpcServer).onConfigurationChange(newConf);
1308    }
1309  }
1310
1311  protected PriorityFunction createPriority() {
1312    return new AnnotationReadingPriorityFunction(this);
1313  }
1314
1315  protected void requirePermission(String request, Permission.Action perm) throws IOException {
1316    if (accessChecker != null) {
1317      accessChecker.requirePermission(RpcServer.getRequestUser().orElse(null), request, null, perm);
1318    }
1319  }
1320
1321  public int getScannersCount() {
1322    return scanners.size();
1323  }
1324
1325  /** Returns The outstanding RegionScanner for <code>scannerId</code> or null if none found. */
1326  RegionScanner getScanner(long scannerId) {
1327    RegionScannerHolder rsh = getRegionScannerHolder(scannerId);
1328    return rsh == null ? null : rsh.s;
1329  }
1330
1331  /** Returns The associated RegionScannerHolder for <code>scannerId</code> or null. */
1332  private RegionScannerHolder getRegionScannerHolder(long scannerId) {
1333    return scanners.get(toScannerName(scannerId));
1334  }
1335
1336  public String getScanDetailsWithId(long scannerId) {
1337    RegionScanner scanner = getScanner(scannerId);
1338    if (scanner == null) {
1339      return null;
1340    }
1341    StringBuilder builder = new StringBuilder();
1342    builder.append("table: ").append(scanner.getRegionInfo().getTable().getNameAsString());
1343    builder.append(" region: ").append(scanner.getRegionInfo().getRegionNameAsString());
1344    builder.append(" operation_id: ").append(scanner.getOperationId());
1345    return builder.toString();
1346  }
1347
1348  public String getScanDetailsWithRequest(ScanRequest request) {
1349    try {
1350      if (!request.hasRegion()) {
1351        return null;
1352      }
1353      Region region = getRegion(request.getRegion());
1354      StringBuilder builder = new StringBuilder();
1355      builder.append("table: ").append(region.getRegionInfo().getTable().getNameAsString());
1356      builder.append(" region: ").append(region.getRegionInfo().getRegionNameAsString());
1357      for (NameBytesPair pair : request.getScan().getAttributeList()) {
1358        if (OperationWithAttributes.ID_ATRIBUTE.equals(pair.getName())) {
1359          builder.append(" operation_id: ").append(Bytes.toString(pair.getValue().toByteArray()));
1360          break;
1361        }
1362      }
1363      return builder.toString();
1364    } catch (IOException ignored) {
1365      return null;
1366    }
1367  }
1368
1369  /**
1370   * Get the vtime associated with the scanner. Currently the vtime is the number of "next" calls.
1371   */
1372  long getScannerVirtualTime(long scannerId) {
1373    RegionScannerHolder rsh = getRegionScannerHolder(scannerId);
1374    return rsh == null ? 0L : rsh.getNextCallSeq();
1375  }
1376
1377  /**
1378   * Method to account for the size of retained cells and retained data blocks.
1379   * @param context   rpc call context
1380   * @param r         result to add size.
1381   * @param lastBlock last block to check whether we need to add the block size in context.
1382   * @return an object that represents the last referenced block from this response.
1383   */
1384  Object addSize(RpcCallContext context, Result r, Object lastBlock) {
1385    if (context != null && r != null && !r.isEmpty()) {
1386      for (Cell c : r.rawCells()) {
1387        context.incrementResponseCellSize(PrivateCellUtil.estimatedSerializedSizeOf(c));
1388
1389        // Since byte buffers can point all kinds of crazy places it's harder to keep track
1390        // of which blocks are kept alive by what byte buffer.
1391        // So we make a guess.
1392        if (c instanceof ByteBufferExtendedCell) {
1393          ByteBufferExtendedCell bbCell = (ByteBufferExtendedCell) c;
1394          ByteBuffer bb = bbCell.getValueByteBuffer();
1395          if (bb != lastBlock) {
1396            context.incrementResponseBlockSize(bb.capacity());
1397            lastBlock = bb;
1398          }
1399        } else {
1400          // We're using the last block being the same as the current block as
1401          // a proxy for pointing to a new block. This won't be exact.
1402          // If there are multiple gets that bounce back and forth
1403          // Then it's possible that this will over count the size of
1404          // referenced blocks. However it's better to over count and
1405          // use two rpcs than to OOME the regionserver.
1406          byte[] valueArray = c.getValueArray();
1407          if (valueArray != lastBlock) {
1408            context.incrementResponseBlockSize(valueArray.length);
1409            lastBlock = valueArray;
1410          }
1411        }
1412
1413      }
1414    }
1415    return lastBlock;
1416  }
1417
1418  /** Returns Remote client's ip and port else null if can't be determined. */
1419  @RestrictedApi(explanation = "Should only be called in TestRSRpcServices and RSRpcServices",
1420      link = "", allowedOnPath = ".*(TestRSRpcServices|RSRpcServices).java")
1421  static String getRemoteClientIpAndPort() {
1422    RpcCall rpcCall = RpcServer.getCurrentCall().orElse(null);
1423    if (rpcCall == null) {
1424      return HConstants.EMPTY_STRING;
1425    }
1426    InetAddress address = rpcCall.getRemoteAddress();
1427    if (address == null) {
1428      return HConstants.EMPTY_STRING;
1429    }
1430    // Be careful here with InetAddress. Do InetAddress#getHostAddress. It will not do a name
1431    // resolution. Just use the IP. It is generally a smaller amount of info to keep around while
1432    // scanning than a hostname anyways.
1433    return Address.fromParts(address.getHostAddress(), rpcCall.getRemotePort()).toString();
1434  }
1435
1436  /** Returns Remote client's username. */
1437  @RestrictedApi(explanation = "Should only be called in TestRSRpcServices and RSRpcServices",
1438      link = "", allowedOnPath = ".*(TestRSRpcServices|RSRpcServices).java")
1439  static String getUserName() {
1440    RpcCall rpcCall = RpcServer.getCurrentCall().orElse(null);
1441    if (rpcCall == null) {
1442      return HConstants.EMPTY_STRING;
1443    }
1444    return rpcCall.getRequestUserName().orElse(HConstants.EMPTY_STRING);
1445  }
1446
1447  private RegionScannerHolder addScanner(String scannerName, RegionScanner s, Shipper shipper,
1448    HRegion r, boolean needCursor, boolean fullRegionScan) throws LeaseStillHeldException {
1449    Lease lease = regionServer.getLeaseManager().createLease(scannerName,
1450      this.scannerLeaseTimeoutPeriod, new ScannerListener(scannerName));
1451    RpcCallback shippedCallback = new RegionScannerShippedCallBack(scannerName, shipper, lease);
1452    RpcCallback closeCallback =
1453      s instanceof RpcCallback ? (RpcCallback) s : new RegionScannerCloseCallBack(s);
1454    RegionScannerHolder rsh = new RegionScannerHolder(s, r, closeCallback, shippedCallback,
1455      needCursor, fullRegionScan, getRemoteClientIpAndPort(), getUserName());
1456    RegionScannerHolder existing = scanners.putIfAbsent(scannerName, rsh);
1457    assert existing == null : "scannerId must be unique within regionserver's whole lifecycle! "
1458      + scannerName + ", " + existing;
1459    return rsh;
1460  }
1461
1462  private boolean isFullRegionScan(Scan scan, HRegion region) {
1463    // If the scan start row equals or less than the start key of the region
1464    // and stop row greater than equals end key (if stop row present)
1465    // or if the stop row is empty
1466    // account this as a full region scan
1467    if (
1468      Bytes.compareTo(scan.getStartRow(), region.getRegionInfo().getStartKey()) <= 0
1469        && (Bytes.compareTo(scan.getStopRow(), region.getRegionInfo().getEndKey()) >= 0
1470          && !Bytes.equals(region.getRegionInfo().getEndKey(), HConstants.EMPTY_END_ROW)
1471          || Bytes.equals(scan.getStopRow(), HConstants.EMPTY_END_ROW))
1472    ) {
1473      return true;
1474    }
1475    return false;
1476  }
1477
1478  /**
1479   * Find the HRegion based on a region specifier
1480   * @param regionSpecifier the region specifier
1481   * @return the corresponding region
1482   * @throws IOException if the specifier is not null, but failed to find the region
1483   */
1484  public HRegion getRegion(final RegionSpecifier regionSpecifier) throws IOException {
1485    return regionServer.getRegion(regionSpecifier.getValue().toByteArray());
1486  }
1487
1488  /**
1489   * Find the List of HRegions based on a list of region specifiers
1490   * @param regionSpecifiers the list of region specifiers
1491   * @return the corresponding list of regions
1492   * @throws IOException if any of the specifiers is not null, but failed to find the region
1493   */
1494  private List<HRegion> getRegions(final List<RegionSpecifier> regionSpecifiers,
1495    final CacheEvictionStatsBuilder stats) {
1496    List<HRegion> regions = Lists.newArrayListWithCapacity(regionSpecifiers.size());
1497    for (RegionSpecifier regionSpecifier : regionSpecifiers) {
1498      try {
1499        regions.add(regionServer.getRegion(regionSpecifier.getValue().toByteArray()));
1500      } catch (NotServingRegionException e) {
1501        stats.addException(regionSpecifier.getValue().toByteArray(), e);
1502      }
1503    }
1504    return regions;
1505  }
1506
1507  public PriorityFunction getPriority() {
1508    return priority;
1509  }
1510
1511  public Configuration getConfiguration() {
1512    return regionServer.getConfiguration();
1513  }
1514
1515  private RegionServerRpcQuotaManager getRpcQuotaManager() {
1516    return regionServer.getRegionServerRpcQuotaManager();
1517  }
1518
1519  private RegionServerSpaceQuotaManager getSpaceQuotaManager() {
1520    return regionServer.getRegionServerSpaceQuotaManager();
1521  }
1522
1523  void start(ZKWatcher zkWatcher) {
1524    if (AccessChecker.isAuthorizationSupported(getConfiguration())) {
1525      accessChecker = new AccessChecker(getConfiguration());
1526    } else {
1527      accessChecker = new NoopAccessChecker(getConfiguration());
1528    }
1529    zkPermissionWatcher =
1530      new ZKPermissionWatcher(zkWatcher, accessChecker.getAuthManager(), getConfiguration());
1531    try {
1532      zkPermissionWatcher.start();
1533    } catch (KeeperException e) {
1534      LOG.error("ZooKeeper permission watcher initialization failed", e);
1535    }
1536    this.scannerIdGenerator = new ScannerIdGenerator(this.regionServer.serverName);
1537    rpcServer.start();
1538  }
1539
1540  void stop() {
1541    if (zkPermissionWatcher != null) {
1542      zkPermissionWatcher.close();
1543    }
1544    closeAllScanners();
1545    rpcServer.stop();
1546  }
1547
1548  /**
1549   * Called to verify that this server is up and running.
1550   */
1551  // TODO : Rename this and HMaster#checkInitialized to isRunning() (or a better name).
1552  protected void checkOpen() throws IOException {
1553    if (regionServer.isAborted()) {
1554      throw new RegionServerAbortedException("Server " + regionServer.serverName + " aborting");
1555    }
1556    if (regionServer.isStopped()) {
1557      throw new RegionServerStoppedException("Server " + regionServer.serverName + " stopping");
1558    }
1559    if (!regionServer.isDataFileSystemOk()) {
1560      throw new RegionServerStoppedException("File system not available");
1561    }
1562    if (!regionServer.isOnline()) {
1563      throw new ServerNotRunningYetException(
1564        "Server " + regionServer.serverName + " is not running yet");
1565    }
1566  }
1567
1568  /**
1569   * By default, put up an Admin and a Client Service. Set booleans
1570   * <code>hbase.regionserver.admin.executorService</code> and
1571   * <code>hbase.regionserver.client.executorService</code> if you want to enable/disable services.
1572   * Default is that both are enabled.
1573   * @return immutable list of blocking services and the security info classes that this server
1574   *         supports
1575   */
1576  protected List<BlockingServiceAndInterface> getServices() {
1577    boolean admin = getConfiguration().getBoolean(REGIONSERVER_ADMIN_SERVICE_CONFIG, true);
1578    boolean client = getConfiguration().getBoolean(REGIONSERVER_CLIENT_SERVICE_CONFIG, true);
1579    boolean clientMeta =
1580      getConfiguration().getBoolean(REGIONSERVER_CLIENT_META_SERVICE_CONFIG, true);
1581    List<BlockingServiceAndInterface> bssi = new ArrayList<>();
1582    if (client) {
1583      bssi.add(new BlockingServiceAndInterface(ClientService.newReflectiveBlockingService(this),
1584        ClientService.BlockingInterface.class));
1585    }
1586    if (admin) {
1587      bssi.add(new BlockingServiceAndInterface(AdminService.newReflectiveBlockingService(this),
1588        AdminService.BlockingInterface.class));
1589    }
1590    if (clientMeta) {
1591      bssi.add(new BlockingServiceAndInterface(ClientMetaService.newReflectiveBlockingService(this),
1592        ClientMetaService.BlockingInterface.class));
1593    }
1594    return new ImmutableList.Builder<BlockingServiceAndInterface>().addAll(bssi).build();
1595  }
1596
1597  public InetSocketAddress getSocketAddress() {
1598    return isa;
1599  }
1600
1601  @Override
1602  public int getPriority(RequestHeader header, Message param, User user) {
1603    return priority.getPriority(header, param, user);
1604  }
1605
1606  @Override
1607  public long getDeadline(RequestHeader header, Message param) {
1608    return priority.getDeadline(header, param);
1609  }
1610
1611  /*
1612   * Check if an OOME and, if so, abort immediately to avoid creating more objects. n *
1613   * @return True if we OOME'd and are aborting.
1614   */
1615  @Override
1616  public boolean checkOOME(final Throwable e) {
1617    return exitIfOOME(e);
1618  }
1619
1620  public static boolean exitIfOOME(final Throwable e) {
1621    boolean stop = false;
1622    try {
1623      if (
1624        e instanceof OutOfMemoryError
1625          || (e.getCause() != null && e.getCause() instanceof OutOfMemoryError)
1626          || (e.getMessage() != null && e.getMessage().contains("java.lang.OutOfMemoryError"))
1627      ) {
1628        stop = true;
1629        LOG.error(HBaseMarkers.FATAL, "Run out of memory; " + RSRpcServices.class.getSimpleName()
1630          + " will abort itself immediately", e);
1631      }
1632    } finally {
1633      if (stop) {
1634        Runtime.getRuntime().halt(1);
1635      }
1636    }
1637    return stop;
1638  }
1639
1640  /**
1641   * Close a region on the region server.
1642   * @param controller the RPC controller
1643   * @param request    the request n
1644   */
1645  @Override
1646  @QosPriority(priority = HConstants.ADMIN_QOS)
1647  public CloseRegionResponse closeRegion(final RpcController controller,
1648    final CloseRegionRequest request) throws ServiceException {
1649    final ServerName sn = (request.hasDestinationServer()
1650      ? ProtobufUtil.toServerName(request.getDestinationServer())
1651      : null);
1652
1653    try {
1654      checkOpen();
1655      throwOnWrongStartCode(request);
1656      final String encodedRegionName = ProtobufUtil.getRegionEncodedName(request.getRegion());
1657
1658      requestCount.increment();
1659      if (sn == null) {
1660        LOG.info("Close " + encodedRegionName + " without moving");
1661      } else {
1662        LOG.info("Close " + encodedRegionName + ", moving to " + sn);
1663      }
1664      boolean closed = regionServer.closeRegion(encodedRegionName, false, sn);
1665      CloseRegionResponse.Builder builder = CloseRegionResponse.newBuilder().setClosed(closed);
1666      return builder.build();
1667    } catch (IOException ie) {
1668      throw new ServiceException(ie);
1669    }
1670  }
1671
1672  /**
1673   * Compact a region on the region server.
1674   * @param controller the RPC controller
1675   * @param request    the request n
1676   */
1677  @Override
1678  @QosPriority(priority = HConstants.ADMIN_QOS)
1679  public CompactRegionResponse compactRegion(final RpcController controller,
1680    final CompactRegionRequest request) throws ServiceException {
1681    try {
1682      checkOpen();
1683      requestCount.increment();
1684      HRegion region = getRegion(request.getRegion());
1685      // Quota support is enabled, the requesting user is not system/super user
1686      // and a quota policy is enforced that disables compactions.
1687      if (
1688        QuotaUtil.isQuotaEnabled(getConfiguration())
1689          && !Superusers.isSuperUser(RpcServer.getRequestUser().orElse(null))
1690          && this.regionServer.getRegionServerSpaceQuotaManager()
1691            .areCompactionsDisabled(region.getTableDescriptor().getTableName())
1692      ) {
1693        throw new DoNotRetryIOException(
1694          "Compactions on this region are " + "disabled due to a space quota violation.");
1695      }
1696      region.startRegionOperation(Operation.COMPACT_REGION);
1697      LOG.info("Compacting " + region.getRegionInfo().getRegionNameAsString());
1698      boolean major = request.hasMajor() && request.getMajor();
1699      if (request.hasFamily()) {
1700        byte[] family = request.getFamily().toByteArray();
1701        String log = "User-triggered " + (major ? "major " : "") + "compaction for region "
1702          + region.getRegionInfo().getRegionNameAsString() + " and family "
1703          + Bytes.toString(family);
1704        LOG.trace(log);
1705        region.requestCompaction(family, log, Store.PRIORITY_USER, major,
1706          CompactionLifeCycleTracker.DUMMY);
1707      } else {
1708        String log = "User-triggered " + (major ? "major " : "") + "compaction for region "
1709          + region.getRegionInfo().getRegionNameAsString();
1710        LOG.trace(log);
1711        region.requestCompaction(log, Store.PRIORITY_USER, major, CompactionLifeCycleTracker.DUMMY);
1712      }
1713      return CompactRegionResponse.newBuilder().build();
1714    } catch (IOException ie) {
1715      throw new ServiceException(ie);
1716    }
1717  }
1718
1719  @Override
1720  public CompactionSwitchResponse compactionSwitch(RpcController controller,
1721    CompactionSwitchRequest request) throws ServiceException {
1722    rpcPreCheck("compactionSwitch");
1723    final CompactSplit compactSplitThread = regionServer.getCompactSplitThread();
1724    requestCount.increment();
1725    boolean prevState = compactSplitThread.isCompactionsEnabled();
1726    CompactionSwitchResponse response =
1727      CompactionSwitchResponse.newBuilder().setPrevState(prevState).build();
1728    if (prevState == request.getEnabled()) {
1729      // passed in requested state is same as current state. No action required
1730      return response;
1731    }
1732    compactSplitThread.switchCompaction(request.getEnabled());
1733    return response;
1734  }
1735
1736  /**
1737   * Flush a region on the region server.
1738   * @param controller the RPC controller
1739   * @param request    the request n
1740   */
1741  @Override
1742  @QosPriority(priority = HConstants.ADMIN_QOS)
1743  public FlushRegionResponse flushRegion(final RpcController controller,
1744    final FlushRegionRequest request) throws ServiceException {
1745    try {
1746      checkOpen();
1747      requestCount.increment();
1748      HRegion region = getRegion(request.getRegion());
1749      LOG.info("Flushing " + region.getRegionInfo().getRegionNameAsString());
1750      boolean shouldFlush = true;
1751      if (request.hasIfOlderThanTs()) {
1752        shouldFlush = region.getEarliestFlushTimeForAllStores() < request.getIfOlderThanTs();
1753      }
1754      FlushRegionResponse.Builder builder = FlushRegionResponse.newBuilder();
1755      if (shouldFlush) {
1756        boolean writeFlushWalMarker =
1757          request.hasWriteFlushWalMarker() ? request.getWriteFlushWalMarker() : false;
1758        // Go behind the curtain so we can manage writing of the flush WAL marker
1759        HRegion.FlushResultImpl flushResult = null;
1760        if (request.hasFamily()) {
1761          List families = new ArrayList();
1762          families.add(request.getFamily().toByteArray());
1763          flushResult =
1764            region.flushcache(families, writeFlushWalMarker, FlushLifeCycleTracker.DUMMY);
1765        } else {
1766          flushResult = region.flushcache(true, writeFlushWalMarker, FlushLifeCycleTracker.DUMMY);
1767        }
1768        boolean compactionNeeded = flushResult.isCompactionNeeded();
1769        if (compactionNeeded) {
1770          regionServer.getCompactSplitThread().requestSystemCompaction(region,
1771            "Compaction through user triggered flush");
1772        }
1773        builder.setFlushed(flushResult.isFlushSucceeded());
1774        builder.setWroteFlushWalMarker(flushResult.wroteFlushWalMarker);
1775      }
1776      builder.setLastFlushTime(region.getEarliestFlushTimeForAllStores());
1777      return builder.build();
1778    } catch (DroppedSnapshotException ex) {
1779      // Cache flush can fail in a few places. If it fails in a critical
1780      // section, we get a DroppedSnapshotException and a replay of wal
1781      // is required. Currently the only way to do this is a restart of
1782      // the server.
1783      regionServer.abort("Replay of WAL required. Forcing server shutdown", ex);
1784      throw new ServiceException(ex);
1785    } catch (IOException ie) {
1786      throw new ServiceException(ie);
1787    }
1788  }
1789
1790  @Override
1791  @QosPriority(priority = HConstants.ADMIN_QOS)
1792  public GetOnlineRegionResponse getOnlineRegion(final RpcController controller,
1793    final GetOnlineRegionRequest request) throws ServiceException {
1794    try {
1795      checkOpen();
1796      requestCount.increment();
1797      Map<String, HRegion> onlineRegions = regionServer.getOnlineRegions();
1798      List<RegionInfo> list = new ArrayList<>(onlineRegions.size());
1799      for (HRegion region : onlineRegions.values()) {
1800        list.add(region.getRegionInfo());
1801      }
1802      list.sort(RegionInfo.COMPARATOR);
1803      return ResponseConverter.buildGetOnlineRegionResponse(list);
1804    } catch (IOException ie) {
1805      throw new ServiceException(ie);
1806    }
1807  }
1808
1809  // Master implementation of this Admin Service differs given it is not
1810  // able to supply detail only known to RegionServer. See note on
1811  // MasterRpcServers#getRegionInfo.
1812  @Override
1813  @QosPriority(priority = HConstants.ADMIN_QOS)
1814  public GetRegionInfoResponse getRegionInfo(final RpcController controller,
1815    final GetRegionInfoRequest request) throws ServiceException {
1816    try {
1817      checkOpen();
1818      requestCount.increment();
1819      HRegion region = getRegion(request.getRegion());
1820      RegionInfo info = region.getRegionInfo();
1821      byte[] bestSplitRow;
1822      if (request.hasBestSplitRow() && request.getBestSplitRow()) {
1823        bestSplitRow = region.checkSplit(true).orElse(null);
1824        // when all table data are in memstore, bestSplitRow = null
1825        // try to flush region first
1826        if (bestSplitRow == null) {
1827          region.flush(true);
1828          bestSplitRow = region.checkSplit(true).orElse(null);
1829        }
1830      } else {
1831        bestSplitRow = null;
1832      }
1833      GetRegionInfoResponse.Builder builder = GetRegionInfoResponse.newBuilder();
1834      builder.setRegionInfo(ProtobufUtil.toRegionInfo(info));
1835      if (request.hasCompactionState() && request.getCompactionState()) {
1836        builder.setCompactionState(ProtobufUtil.createCompactionState(region.getCompactionState()));
1837      }
1838      builder.setSplittable(region.isSplittable());
1839      builder.setMergeable(region.isMergeable());
1840      if (request.hasBestSplitRow() && request.getBestSplitRow() && bestSplitRow != null) {
1841        builder.setBestSplitRow(UnsafeByteOperations.unsafeWrap(bestSplitRow));
1842      }
1843      return builder.build();
1844    } catch (IOException ie) {
1845      throw new ServiceException(ie);
1846    }
1847  }
1848
1849  @Override
1850  @QosPriority(priority = HConstants.ADMIN_QOS)
1851  public GetRegionLoadResponse getRegionLoad(RpcController controller, GetRegionLoadRequest request)
1852    throws ServiceException {
1853
1854    List<HRegion> regions;
1855    if (request.hasTableName()) {
1856      TableName tableName = ProtobufUtil.toTableName(request.getTableName());
1857      regions = regionServer.getRegions(tableName);
1858    } else {
1859      regions = regionServer.getRegions();
1860    }
1861    List<RegionLoad> rLoads = new ArrayList<>(regions.size());
1862    RegionLoad.Builder regionLoadBuilder = ClusterStatusProtos.RegionLoad.newBuilder();
1863    RegionSpecifier.Builder regionSpecifier = RegionSpecifier.newBuilder();
1864
1865    try {
1866      for (HRegion region : regions) {
1867        rLoads.add(regionServer.createRegionLoad(region, regionLoadBuilder, regionSpecifier));
1868      }
1869    } catch (IOException e) {
1870      throw new ServiceException(e);
1871    }
1872    GetRegionLoadResponse.Builder builder = GetRegionLoadResponse.newBuilder();
1873    builder.addAllRegionLoads(rLoads);
1874    return builder.build();
1875  }
1876
1877  @Override
1878  @QosPriority(priority = HConstants.ADMIN_QOS)
1879  public ClearCompactionQueuesResponse clearCompactionQueues(RpcController controller,
1880    ClearCompactionQueuesRequest request) throws ServiceException {
1881    LOG.debug("Client=" + RpcServer.getRequestUserName().orElse(null) + "/"
1882      + RpcServer.getRemoteAddress().orElse(null) + " clear compactions queue");
1883    ClearCompactionQueuesResponse.Builder respBuilder = ClearCompactionQueuesResponse.newBuilder();
1884    requestCount.increment();
1885    if (clearCompactionQueues.compareAndSet(false, true)) {
1886      final CompactSplit compactSplitThread = regionServer.getCompactSplitThread();
1887      try {
1888        checkOpen();
1889        regionServer.getRegionServerCoprocessorHost().preClearCompactionQueues();
1890        for (String queueName : request.getQueueNameList()) {
1891          LOG.debug("clear " + queueName + " compaction queue");
1892          switch (queueName) {
1893            case "long":
1894              compactSplitThread.clearLongCompactionsQueue();
1895              break;
1896            case "short":
1897              compactSplitThread.clearShortCompactionsQueue();
1898              break;
1899            default:
1900              LOG.warn("Unknown queue name " + queueName);
1901              throw new IOException("Unknown queue name " + queueName);
1902          }
1903        }
1904        regionServer.getRegionServerCoprocessorHost().postClearCompactionQueues();
1905      } catch (IOException ie) {
1906        throw new ServiceException(ie);
1907      } finally {
1908        clearCompactionQueues.set(false);
1909      }
1910    } else {
1911      LOG.warn("Clear compactions queue is executing by other admin.");
1912    }
1913    return respBuilder.build();
1914  }
1915
1916  /**
1917   * Get some information of the region server.
1918   * @param controller the RPC controller
1919   * @param request    the request n
1920   */
1921  @Override
1922  @QosPriority(priority = HConstants.ADMIN_QOS)
1923  public GetServerInfoResponse getServerInfo(final RpcController controller,
1924    final GetServerInfoRequest request) throws ServiceException {
1925    try {
1926      checkOpen();
1927    } catch (IOException ie) {
1928      throw new ServiceException(ie);
1929    }
1930    requestCount.increment();
1931    int infoPort = regionServer.infoServer != null ? regionServer.infoServer.getPort() : -1;
1932    return ResponseConverter.buildGetServerInfoResponse(regionServer.serverName, infoPort);
1933  }
1934
1935  @Override
1936  @QosPriority(priority = HConstants.ADMIN_QOS)
1937  public GetStoreFileResponse getStoreFile(final RpcController controller,
1938    final GetStoreFileRequest request) throws ServiceException {
1939    try {
1940      checkOpen();
1941      HRegion region = getRegion(request.getRegion());
1942      requestCount.increment();
1943      Set<byte[]> columnFamilies;
1944      if (request.getFamilyCount() == 0) {
1945        columnFamilies = region.getTableDescriptor().getColumnFamilyNames();
1946      } else {
1947        columnFamilies = new TreeSet<>(Bytes.BYTES_RAWCOMPARATOR);
1948        for (ByteString cf : request.getFamilyList()) {
1949          columnFamilies.add(cf.toByteArray());
1950        }
1951      }
1952      int nCF = columnFamilies.size();
1953      List<String> fileList = region.getStoreFileList(columnFamilies.toArray(new byte[nCF][]));
1954      GetStoreFileResponse.Builder builder = GetStoreFileResponse.newBuilder();
1955      builder.addAllStoreFile(fileList);
1956      return builder.build();
1957    } catch (IOException ie) {
1958      throw new ServiceException(ie);
1959    }
1960  }
1961
1962  private void throwOnWrongStartCode(OpenRegionRequest request) throws ServiceException {
1963    if (!request.hasServerStartCode()) {
1964      LOG.warn("OpenRegionRequest for {} does not have a start code", request.getOpenInfoList());
1965      return;
1966    }
1967    throwOnWrongStartCode(request.getServerStartCode());
1968  }
1969
1970  private void throwOnWrongStartCode(CloseRegionRequest request) throws ServiceException {
1971    if (!request.hasServerStartCode()) {
1972      LOG.warn("CloseRegionRequest for {} does not have a start code", request.getRegion());
1973      return;
1974    }
1975    throwOnWrongStartCode(request.getServerStartCode());
1976  }
1977
1978  private void throwOnWrongStartCode(long serverStartCode) throws ServiceException {
1979    // check that we are the same server that this RPC is intended for.
1980    if (regionServer.serverName.getStartcode() != serverStartCode) {
1981      throw new ServiceException(new DoNotRetryIOException(
1982        "This RPC was intended for a " + "different server with startCode: " + serverStartCode
1983          + ", this server is: " + regionServer.serverName));
1984    }
1985  }
1986
1987  private void throwOnWrongStartCode(ExecuteProceduresRequest req) throws ServiceException {
1988    if (req.getOpenRegionCount() > 0) {
1989      for (OpenRegionRequest openReq : req.getOpenRegionList()) {
1990        throwOnWrongStartCode(openReq);
1991      }
1992    }
1993    if (req.getCloseRegionCount() > 0) {
1994      for (CloseRegionRequest closeReq : req.getCloseRegionList()) {
1995        throwOnWrongStartCode(closeReq);
1996      }
1997    }
1998  }
1999
2000  /**
2001   * Open asynchronously a region or a set of regions on the region server. The opening is
2002   * coordinated by ZooKeeper, and this method requires the znode to be created before being called.
2003   * As a consequence, this method should be called only from the master.
2004   * <p>
2005   * Different manages states for the region are:
2006   * </p>
2007   * <ul>
2008   * <li>region not opened: the region opening will start asynchronously.</li>
2009   * <li>a close is already in progress: this is considered as an error.</li>
2010   * <li>an open is already in progress: this new open request will be ignored. This is important
2011   * because the Master can do multiple requests if it crashes.</li>
2012   * <li>the region is already opened: this new open request will be ignored.</li>
2013   * </ul>
2014   * <p>
2015   * Bulk assign: If there are more than 1 region to open, it will be considered as a bulk assign.
2016   * For a single region opening, errors are sent through a ServiceException. For bulk assign,
2017   * errors are put in the response as FAILED_OPENING.
2018   * </p>
2019   * @param controller the RPC controller
2020   * @param request    the request n
2021   */
2022  @Override
2023  @QosPriority(priority = HConstants.ADMIN_QOS)
2024  public OpenRegionResponse openRegion(final RpcController controller,
2025    final OpenRegionRequest request) throws ServiceException {
2026    requestCount.increment();
2027    throwOnWrongStartCode(request);
2028
2029    OpenRegionResponse.Builder builder = OpenRegionResponse.newBuilder();
2030    final int regionCount = request.getOpenInfoCount();
2031    final Map<TableName, TableDescriptor> htds = new HashMap<>(regionCount);
2032    final boolean isBulkAssign = regionCount > 1;
2033    try {
2034      checkOpen();
2035    } catch (IOException ie) {
2036      TableName tableName = null;
2037      if (regionCount == 1) {
2038        org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionInfo ri =
2039          request.getOpenInfo(0).getRegion();
2040        if (ri != null) {
2041          tableName = ProtobufUtil.toTableName(ri.getTableName());
2042        }
2043      }
2044      if (!TableName.META_TABLE_NAME.equals(tableName)) {
2045        throw new ServiceException(ie);
2046      }
2047      // We are assigning meta, wait a little for regionserver to finish initialization.
2048      // Default to quarter of RPC timeout
2049      int timeout = regionServer.getConfiguration().getInt(HConstants.HBASE_RPC_TIMEOUT_KEY,
2050        HConstants.DEFAULT_HBASE_RPC_TIMEOUT) >> 2;
2051      long endTime = EnvironmentEdgeManager.currentTime() + timeout;
2052      synchronized (regionServer.online) {
2053        try {
2054          while (
2055            EnvironmentEdgeManager.currentTime() <= endTime && !regionServer.isStopped()
2056              && !regionServer.isOnline()
2057          ) {
2058            regionServer.online.wait(regionServer.msgInterval);
2059          }
2060          checkOpen();
2061        } catch (InterruptedException t) {
2062          Thread.currentThread().interrupt();
2063          throw new ServiceException(t);
2064        } catch (IOException e) {
2065          throw new ServiceException(e);
2066        }
2067      }
2068    }
2069
2070    long masterSystemTime = request.hasMasterSystemTime() ? request.getMasterSystemTime() : -1;
2071
2072    for (RegionOpenInfo regionOpenInfo : request.getOpenInfoList()) {
2073      final RegionInfo region = ProtobufUtil.toRegionInfo(regionOpenInfo.getRegion());
2074      TableDescriptor htd;
2075      try {
2076        String encodedName = region.getEncodedName();
2077        byte[] encodedNameBytes = region.getEncodedNameAsBytes();
2078        final HRegion onlineRegion = regionServer.getRegion(encodedName);
2079        if (onlineRegion != null) {
2080          // The region is already online. This should not happen any more.
2081          String error = "Received OPEN for the region:" + region.getRegionNameAsString()
2082            + ", which is already online";
2083          LOG.warn(error);
2084          // regionServer.abort(error);
2085          // throw new IOException(error);
2086          builder.addOpeningState(RegionOpeningState.OPENED);
2087          continue;
2088        }
2089        LOG.info("Open " + region.getRegionNameAsString());
2090
2091        final Boolean previous =
2092          regionServer.getRegionsInTransitionInRS().putIfAbsent(encodedNameBytes, Boolean.TRUE);
2093
2094        if (Boolean.FALSE.equals(previous)) {
2095          if (regionServer.getRegion(encodedName) != null) {
2096            // There is a close in progress. This should not happen any more.
2097            String error = "Received OPEN for the region:" + region.getRegionNameAsString()
2098              + ", which we are already trying to CLOSE";
2099            regionServer.abort(error);
2100            throw new IOException(error);
2101          }
2102          regionServer.getRegionsInTransitionInRS().put(encodedNameBytes, Boolean.TRUE);
2103        }
2104
2105        if (Boolean.TRUE.equals(previous)) {
2106          // An open is in progress. This is supported, but let's log this.
2107          LOG.info("Receiving OPEN for the region:" + region.getRegionNameAsString()
2108            + ", which we are already trying to OPEN"
2109            + " - ignoring this new request for this region.");
2110        }
2111
2112        // We are opening this region. If it moves back and forth for whatever reason, we don't
2113        // want to keep returning the stale moved record while we are opening/if we close again.
2114        regionServer.removeFromMovedRegions(region.getEncodedName());
2115
2116        if (previous == null || !previous.booleanValue()) {
2117          htd = htds.get(region.getTable());
2118          if (htd == null) {
2119            htd = regionServer.tableDescriptors.get(region.getTable());
2120            htds.put(region.getTable(), htd);
2121          }
2122          if (htd == null) {
2123            throw new IOException("Missing table descriptor for " + region.getEncodedName());
2124          }
2125          // If there is no action in progress, we can submit a specific handler.
2126          // Need to pass the expected version in the constructor.
2127          if (regionServer.executorService == null) {
2128            LOG.info("No executor executorService; skipping open request");
2129          } else {
2130            if (region.isMetaRegion()) {
2131              regionServer.executorService.submit(
2132                new OpenMetaHandler(regionServer, regionServer, region, htd, masterSystemTime));
2133            } else {
2134              if (regionOpenInfo.getFavoredNodesCount() > 0) {
2135                regionServer.updateRegionFavoredNodesMapping(region.getEncodedName(),
2136                  regionOpenInfo.getFavoredNodesList());
2137              }
2138              if (htd.getPriority() >= HConstants.ADMIN_QOS || region.getTable().isSystemTable()) {
2139                regionServer.executorService.submit(new OpenPriorityRegionHandler(regionServer,
2140                  regionServer, region, htd, masterSystemTime));
2141              } else {
2142                regionServer.executorService.submit(
2143                  new OpenRegionHandler(regionServer, regionServer, region, htd, masterSystemTime));
2144              }
2145            }
2146          }
2147        }
2148
2149        builder.addOpeningState(RegionOpeningState.OPENED);
2150      } catch (IOException ie) {
2151        LOG.warn("Failed opening region " + region.getRegionNameAsString(), ie);
2152        if (isBulkAssign) {
2153          builder.addOpeningState(RegionOpeningState.FAILED_OPENING);
2154        } else {
2155          throw new ServiceException(ie);
2156        }
2157      }
2158    }
2159    return builder.build();
2160  }
2161
2162  /**
2163   * Warmup a region on this server. This method should only be called by Master. It synchronously
2164   * opens the region and closes the region bringing the most important pages in cache.
2165   */
2166  @Override
2167  public WarmupRegionResponse warmupRegion(final RpcController controller,
2168    final WarmupRegionRequest request) throws ServiceException {
2169    final RegionInfo region = ProtobufUtil.toRegionInfo(request.getRegionInfo());
2170    WarmupRegionResponse response = WarmupRegionResponse.getDefaultInstance();
2171    try {
2172      checkOpen();
2173      String encodedName = region.getEncodedName();
2174      byte[] encodedNameBytes = region.getEncodedNameAsBytes();
2175      final HRegion onlineRegion = regionServer.getRegion(encodedName);
2176      if (onlineRegion != null) {
2177        LOG.info("{} is online; skipping warmup", region);
2178        return response;
2179      }
2180      TableDescriptor htd = regionServer.tableDescriptors.get(region.getTable());
2181      if (regionServer.getRegionsInTransitionInRS().containsKey(encodedNameBytes)) {
2182        LOG.info("{} is in transition; skipping warmup", region);
2183        return response;
2184      }
2185      LOG.info("Warmup {}", region.getRegionNameAsString());
2186      HRegion.warmupHRegion(region, htd, regionServer.getWAL(region),
2187        regionServer.getConfiguration(), regionServer, null);
2188    } catch (IOException ie) {
2189      LOG.error("Failed warmup of {}", region.getRegionNameAsString(), ie);
2190      throw new ServiceException(ie);
2191    }
2192
2193    return response;
2194  }
2195
2196  /**
2197   * Replay the given changes when distributedLogReplay WAL edits from a failed RS. The guarantee is
2198   * that the given mutations will be durable on the receiving RS if this method returns without any
2199   * exception.
2200   * @param controller the RPC controller
2201   * @param request    the request n
2202   */
2203  @Override
2204  @QosPriority(priority = HConstants.REPLAY_QOS)
2205  public ReplicateWALEntryResponse replay(final RpcController controller,
2206    final ReplicateWALEntryRequest request) throws ServiceException {
2207    long before = EnvironmentEdgeManager.currentTime();
2208    CellScanner cells = ((HBaseRpcController) controller).cellScanner();
2209    ((HBaseRpcController) controller).setCellScanner(null);
2210    try {
2211      checkOpen();
2212      List<WALEntry> entries = request.getEntryList();
2213      if (entries == null || entries.isEmpty()) {
2214        // empty input
2215        return ReplicateWALEntryResponse.newBuilder().build();
2216      }
2217      ByteString regionName = entries.get(0).getKey().getEncodedRegionName();
2218      HRegion region = regionServer.getRegionByEncodedName(regionName.toStringUtf8());
2219      RegionCoprocessorHost coprocessorHost =
2220        ServerRegionReplicaUtil.isDefaultReplica(region.getRegionInfo())
2221          ? region.getCoprocessorHost()
2222          : null; // do not invoke coprocessors if this is a secondary region replica
2223      List<Pair<WALKey, WALEdit>> walEntries = new ArrayList<>();
2224
2225      // Skip adding the edits to WAL if this is a secondary region replica
2226      boolean isPrimary = RegionReplicaUtil.isDefaultReplica(region.getRegionInfo());
2227      Durability durability = isPrimary ? Durability.USE_DEFAULT : Durability.SKIP_WAL;
2228
2229      for (WALEntry entry : entries) {
2230        if (!regionName.equals(entry.getKey().getEncodedRegionName())) {
2231          throw new NotServingRegionException("Replay request contains entries from multiple "
2232            + "regions. First region:" + regionName.toStringUtf8() + " , other region:"
2233            + entry.getKey().getEncodedRegionName());
2234        }
2235        if (regionServer.nonceManager != null && isPrimary) {
2236          long nonceGroup =
2237            entry.getKey().hasNonceGroup() ? entry.getKey().getNonceGroup() : HConstants.NO_NONCE;
2238          long nonce = entry.getKey().hasNonce() ? entry.getKey().getNonce() : HConstants.NO_NONCE;
2239          regionServer.nonceManager.reportOperationFromWal(nonceGroup, nonce,
2240            entry.getKey().getWriteTime());
2241        }
2242        Pair<WALKey, WALEdit> walEntry = (coprocessorHost == null) ? null : new Pair<>();
2243        List<MutationReplay> edits =
2244          WALSplitUtil.getMutationsFromWALEntry(entry, cells, walEntry, durability);
2245        if (coprocessorHost != null) {
2246          // Start coprocessor replay here. The coprocessor is for each WALEdit instead of a
2247          // KeyValue.
2248          if (
2249            coprocessorHost.preWALRestore(region.getRegionInfo(), walEntry.getFirst(),
2250              walEntry.getSecond())
2251          ) {
2252            // if bypass this log entry, ignore it ...
2253            continue;
2254          }
2255          walEntries.add(walEntry);
2256        }
2257        if (edits != null && !edits.isEmpty()) {
2258          // HBASE-17924
2259          // sort to improve lock efficiency
2260          Collections.sort(edits, (v1, v2) -> Row.COMPARATOR.compare(v1.mutation, v2.mutation));
2261          long replaySeqId = (entry.getKey().hasOrigSequenceNumber())
2262            ? entry.getKey().getOrigSequenceNumber()
2263            : entry.getKey().getLogSequenceNumber();
2264          OperationStatus[] result = doReplayBatchOp(region, edits, replaySeqId);
2265          // check if it's a partial success
2266          for (int i = 0; result != null && i < result.length; i++) {
2267            if (result[i] != OperationStatus.SUCCESS) {
2268              throw new IOException(result[i].getExceptionMsg());
2269            }
2270          }
2271        }
2272      }
2273
2274      // sync wal at the end because ASYNC_WAL is used above
2275      WAL wal = region.getWAL();
2276      if (wal != null) {
2277        wal.sync();
2278      }
2279
2280      if (coprocessorHost != null) {
2281        for (Pair<WALKey, WALEdit> entry : walEntries) {
2282          coprocessorHost.postWALRestore(region.getRegionInfo(), entry.getFirst(),
2283            entry.getSecond());
2284        }
2285      }
2286      return ReplicateWALEntryResponse.newBuilder().build();
2287    } catch (IOException ie) {
2288      throw new ServiceException(ie);
2289    } finally {
2290      final MetricsRegionServer metricsRegionServer = regionServer.getMetrics();
2291      if (metricsRegionServer != null) {
2292        metricsRegionServer.updateReplay(EnvironmentEdgeManager.currentTime() - before);
2293      }
2294    }
2295  }
2296
2297  /**
2298   * Replicate WAL entries on the region server.
2299   * @param controller the RPC controller
2300   * @param request    the request
2301   */
2302  @Override
2303  @QosPriority(priority = HConstants.REPLICATION_QOS)
2304  public ReplicateWALEntryResponse replicateWALEntry(final RpcController controller,
2305    final ReplicateWALEntryRequest request) throws ServiceException {
2306    try {
2307      checkOpen();
2308      if (regionServer.getReplicationSinkService() != null) {
2309        requestCount.increment();
2310        List<WALEntry> entries = request.getEntryList();
2311        CellScanner cellScanner = ((HBaseRpcController) controller).cellScanner();
2312        ((HBaseRpcController) controller).setCellScanner(null);
2313        regionServer.getRegionServerCoprocessorHost().preReplicateLogEntries();
2314        regionServer.getReplicationSinkService().replicateLogEntries(entries, cellScanner,
2315          request.getReplicationClusterId(), request.getSourceBaseNamespaceDirPath(),
2316          request.getSourceHFileArchiveDirPath());
2317        regionServer.getRegionServerCoprocessorHost().postReplicateLogEntries();
2318        return ReplicateWALEntryResponse.newBuilder().build();
2319      } else {
2320        throw new ServiceException("Replication services are not initialized yet");
2321      }
2322    } catch (IOException ie) {
2323      throw new ServiceException(ie);
2324    }
2325  }
2326
2327  /**
2328   * Roll the WAL writer of the region server.
2329   * @param controller the RPC controller
2330   * @param request    the request n
2331   */
2332  @Override
2333  public RollWALWriterResponse rollWALWriter(final RpcController controller,
2334    final RollWALWriterRequest request) throws ServiceException {
2335    try {
2336      checkOpen();
2337      requestCount.increment();
2338      regionServer.getRegionServerCoprocessorHost().preRollWALWriterRequest();
2339      regionServer.getWalRoller().requestRollAll();
2340      regionServer.getRegionServerCoprocessorHost().postRollWALWriterRequest();
2341      RollWALWriterResponse.Builder builder = RollWALWriterResponse.newBuilder();
2342      return builder.build();
2343    } catch (IOException ie) {
2344      throw new ServiceException(ie);
2345    }
2346  }
2347
2348  /**
2349   * Stop the region server.
2350   * @param controller the RPC controller
2351   * @param request    the request n
2352   */
2353  @Override
2354  @QosPriority(priority = HConstants.ADMIN_QOS)
2355  public StopServerResponse stopServer(final RpcController controller,
2356    final StopServerRequest request) throws ServiceException {
2357    rpcPreCheck("stopServer");
2358    requestCount.increment();
2359    String reason = request.getReason();
2360    regionServer.stop(reason);
2361    return StopServerResponse.newBuilder().build();
2362  }
2363
2364  @Override
2365  public UpdateFavoredNodesResponse updateFavoredNodes(RpcController controller,
2366    UpdateFavoredNodesRequest request) throws ServiceException {
2367    rpcPreCheck("updateFavoredNodes");
2368    List<UpdateFavoredNodesRequest.RegionUpdateInfo> openInfoList = request.getUpdateInfoList();
2369    UpdateFavoredNodesResponse.Builder respBuilder = UpdateFavoredNodesResponse.newBuilder();
2370    for (UpdateFavoredNodesRequest.RegionUpdateInfo regionUpdateInfo : openInfoList) {
2371      RegionInfo hri = ProtobufUtil.toRegionInfo(regionUpdateInfo.getRegion());
2372      if (regionUpdateInfo.getFavoredNodesCount() > 0) {
2373        regionServer.updateRegionFavoredNodesMapping(hri.getEncodedName(),
2374          regionUpdateInfo.getFavoredNodesList());
2375      }
2376    }
2377    respBuilder.setResponse(openInfoList.size());
2378    return respBuilder.build();
2379  }
2380
2381  /**
2382   * Atomically bulk load several HFiles into an open region
2383   * @return true if successful, false is failed but recoverably (no action)
2384   * @throws ServiceException if failed unrecoverably
2385   */
2386  @Override
2387  public BulkLoadHFileResponse bulkLoadHFile(final RpcController controller,
2388    final BulkLoadHFileRequest request) throws ServiceException {
2389    long start = EnvironmentEdgeManager.currentTime();
2390    List<String> clusterIds = new ArrayList<>(request.getClusterIdsList());
2391    if (clusterIds.contains(this.regionServer.clusterId)) {
2392      return BulkLoadHFileResponse.newBuilder().setLoaded(true).build();
2393    } else {
2394      clusterIds.add(this.regionServer.clusterId);
2395    }
2396    try {
2397      checkOpen();
2398      requestCount.increment();
2399      HRegion region = getRegion(request.getRegion());
2400      Map<byte[], List<Path>> map = null;
2401      final boolean spaceQuotaEnabled = QuotaUtil.isQuotaEnabled(getConfiguration());
2402      long sizeToBeLoaded = -1;
2403
2404      // Check to see if this bulk load would exceed the space quota for this table
2405      if (spaceQuotaEnabled) {
2406        ActivePolicyEnforcement activeSpaceQuotas = getSpaceQuotaManager().getActiveEnforcements();
2407        SpaceViolationPolicyEnforcement enforcement =
2408          activeSpaceQuotas.getPolicyEnforcement(region);
2409        if (enforcement != null) {
2410          // Bulk loads must still be atomic. We must enact all or none.
2411          List<String> filePaths = new ArrayList<>(request.getFamilyPathCount());
2412          for (FamilyPath familyPath : request.getFamilyPathList()) {
2413            filePaths.add(familyPath.getPath());
2414          }
2415          // Check if the batch of files exceeds the current quota
2416          sizeToBeLoaded = enforcement.computeBulkLoadSize(regionServer.getFileSystem(), filePaths);
2417        }
2418      }
2419
2420      List<Pair<byte[], String>> familyPaths = new ArrayList<>(request.getFamilyPathCount());
2421      for (FamilyPath familyPath : request.getFamilyPathList()) {
2422        familyPaths.add(new Pair<>(familyPath.getFamily().toByteArray(), familyPath.getPath()));
2423      }
2424      if (!request.hasBulkToken()) {
2425        if (region.getCoprocessorHost() != null) {
2426          region.getCoprocessorHost().preBulkLoadHFile(familyPaths);
2427        }
2428        try {
2429          map = region.bulkLoadHFiles(familyPaths, request.getAssignSeqNum(), null,
2430            request.getCopyFile(), clusterIds, request.getReplicate());
2431        } finally {
2432          if (region.getCoprocessorHost() != null) {
2433            region.getCoprocessorHost().postBulkLoadHFile(familyPaths, map);
2434          }
2435        }
2436      } else {
2437        // secure bulk load
2438        map =
2439          regionServer.getSecureBulkLoadManager().secureBulkLoadHFiles(region, request, clusterIds);
2440      }
2441      BulkLoadHFileResponse.Builder builder = BulkLoadHFileResponse.newBuilder();
2442      builder.setLoaded(map != null);
2443      if (map != null) {
2444        // Treat any negative size as a flag to "ignore" updating the region size as that is
2445        // not possible to occur in real life (cannot bulk load a file with negative size)
2446        if (spaceQuotaEnabled && sizeToBeLoaded > 0) {
2447          if (LOG.isTraceEnabled()) {
2448            LOG.trace("Incrementing space use of " + region.getRegionInfo() + " by "
2449              + sizeToBeLoaded + " bytes");
2450          }
2451          // Inform space quotas of the new files for this region
2452          getSpaceQuotaManager().getRegionSizeStore().incrementRegionSize(region.getRegionInfo(),
2453            sizeToBeLoaded);
2454        }
2455      }
2456      return builder.build();
2457    } catch (IOException ie) {
2458      throw new ServiceException(ie);
2459    } finally {
2460      final MetricsRegionServer metricsRegionServer = regionServer.getMetrics();
2461      if (metricsRegionServer != null) {
2462        metricsRegionServer.updateBulkLoad(EnvironmentEdgeManager.currentTime() - start);
2463      }
2464    }
2465  }
2466
2467  @Override
2468  public PrepareBulkLoadResponse prepareBulkLoad(RpcController controller,
2469    PrepareBulkLoadRequest request) throws ServiceException {
2470    try {
2471      checkOpen();
2472      requestCount.increment();
2473
2474      HRegion region = getRegion(request.getRegion());
2475
2476      String bulkToken = regionServer.getSecureBulkLoadManager().prepareBulkLoad(region, request);
2477      PrepareBulkLoadResponse.Builder builder = PrepareBulkLoadResponse.newBuilder();
2478      builder.setBulkToken(bulkToken);
2479      return builder.build();
2480    } catch (IOException ie) {
2481      throw new ServiceException(ie);
2482    }
2483  }
2484
2485  @Override
2486  public CleanupBulkLoadResponse cleanupBulkLoad(RpcController controller,
2487    CleanupBulkLoadRequest request) throws ServiceException {
2488    try {
2489      checkOpen();
2490      requestCount.increment();
2491
2492      HRegion region = getRegion(request.getRegion());
2493
2494      regionServer.getSecureBulkLoadManager().cleanupBulkLoad(region, request);
2495      return CleanupBulkLoadResponse.newBuilder().build();
2496    } catch (IOException ie) {
2497      throw new ServiceException(ie);
2498    }
2499  }
2500
2501  @Override
2502  public CoprocessorServiceResponse execService(final RpcController controller,
2503    final CoprocessorServiceRequest request) throws ServiceException {
2504    try {
2505      checkOpen();
2506      requestCount.increment();
2507      HRegion region = getRegion(request.getRegion());
2508      com.google.protobuf.Message result = execServiceOnRegion(region, request.getCall());
2509      CoprocessorServiceResponse.Builder builder = CoprocessorServiceResponse.newBuilder();
2510      builder.setRegion(RequestConverter.buildRegionSpecifier(RegionSpecifierType.REGION_NAME,
2511        region.getRegionInfo().getRegionName()));
2512      // TODO: COPIES!!!!!!
2513      builder.setValue(builder.getValueBuilder().setName(result.getClass().getName()).setValue(
2514        org.apache.hbase.thirdparty.com.google.protobuf.ByteString.copyFrom(result.toByteArray())));
2515      return builder.build();
2516    } catch (IOException ie) {
2517      throw new ServiceException(ie);
2518    }
2519  }
2520
2521  private com.google.protobuf.Message execServiceOnRegion(HRegion region,
2522    final ClientProtos.CoprocessorServiceCall serviceCall) throws IOException {
2523    // ignore the passed in controller (from the serialized call)
2524    ServerRpcController execController = new ServerRpcController();
2525    return region.execService(execController, serviceCall);
2526  }
2527
2528  /**
2529   * Get data from a table.
2530   * @param controller the RPC controller
2531   * @param request    the get request n
2532   */
2533  @Override
2534  public GetResponse get(final RpcController controller, final GetRequest request)
2535    throws ServiceException {
2536    long before = EnvironmentEdgeManager.currentTime();
2537    OperationQuota quota = null;
2538    HRegion region = null;
2539    try {
2540      checkOpen();
2541      requestCount.increment();
2542      rpcGetRequestCount.increment();
2543      region = getRegion(request.getRegion());
2544
2545      GetResponse.Builder builder = GetResponse.newBuilder();
2546      ClientProtos.Get get = request.getGet();
2547      // An asynchbase client, https://github.com/OpenTSDB/asynchbase, starts by trying to do
2548      // a get closest before. Throwing the UnknownProtocolException signals it that it needs
2549      // to switch and do hbase2 protocol (HBase servers do not tell clients what versions
2550      // they are; its a problem for non-native clients like asynchbase. HBASE-20225.
2551      if (get.hasClosestRowBefore() && get.getClosestRowBefore()) {
2552        throw new UnknownProtocolException("Is this a pre-hbase-1.0.0 or asynchbase client? "
2553          + "Client is invoking getClosestRowBefore removed in hbase-2.0.0 replaced by "
2554          + "reverse Scan.");
2555      }
2556      Boolean existence = null;
2557      Result r = null;
2558      RpcCallContext context = RpcServer.getCurrentCall().orElse(null);
2559      quota = getRpcQuotaManager().checkQuota(region, OperationQuota.OperationType.GET);
2560
2561      Get clientGet = ProtobufUtil.toGet(get);
2562      if (get.getExistenceOnly() && region.getCoprocessorHost() != null) {
2563        existence = region.getCoprocessorHost().preExists(clientGet);
2564      }
2565      if (existence == null) {
2566        if (context != null) {
2567          r = get(clientGet, (region), null, context);
2568        } else {
2569          // for test purpose
2570          r = region.get(clientGet);
2571        }
2572        if (get.getExistenceOnly()) {
2573          boolean exists = r.getExists();
2574          if (region.getCoprocessorHost() != null) {
2575            exists = region.getCoprocessorHost().postExists(clientGet, exists);
2576          }
2577          existence = exists;
2578        }
2579      }
2580      if (existence != null) {
2581        ClientProtos.Result pbr =
2582          ProtobufUtil.toResult(existence, region.getRegionInfo().getReplicaId() != 0);
2583        builder.setResult(pbr);
2584      } else if (r != null) {
2585        ClientProtos.Result pbr;
2586        if (
2587          isClientCellBlockSupport(context) && controller instanceof HBaseRpcController
2588            && VersionInfoUtil.hasMinimumVersion(context.getClientVersionInfo(), 1, 3)
2589        ) {
2590          pbr = ProtobufUtil.toResultNoData(r);
2591          ((HBaseRpcController) controller)
2592            .setCellScanner(CellUtil.createCellScanner(r.rawCells()));
2593          addSize(context, r, null);
2594        } else {
2595          pbr = ProtobufUtil.toResult(r);
2596        }
2597        builder.setResult(pbr);
2598      }
2599      // r.cells is null when an table.exists(get) call
2600      if (r != null && r.rawCells() != null) {
2601        quota.addGetResult(r);
2602      }
2603      return builder.build();
2604    } catch (IOException ie) {
2605      throw new ServiceException(ie);
2606    } finally {
2607      final MetricsRegionServer metricsRegionServer = regionServer.getMetrics();
2608      if (metricsRegionServer != null) {
2609        TableDescriptor td = region != null ? region.getTableDescriptor() : null;
2610        if (td != null) {
2611          metricsRegionServer.updateGet(td.getTableName(),
2612            EnvironmentEdgeManager.currentTime() - before);
2613        }
2614      }
2615      if (quota != null) {
2616        quota.close();
2617      }
2618    }
2619  }
2620
2621  private Result get(Get get, HRegion region, RegionScannersCloseCallBack closeCallBack,
2622    RpcCallContext context) throws IOException {
2623    region.prepareGet(get);
2624    boolean stale = region.getRegionInfo().getReplicaId() != 0;
2625
2626    // This method is almost the same as HRegion#get.
2627    List<Cell> results = new ArrayList<>();
2628    long before = EnvironmentEdgeManager.currentTime();
2629    // pre-get CP hook
2630    if (region.getCoprocessorHost() != null) {
2631      if (region.getCoprocessorHost().preGet(get, results)) {
2632        region.metricsUpdateForGet(results, before);
2633        return Result.create(results, get.isCheckExistenceOnly() ? !results.isEmpty() : null,
2634          stale);
2635      }
2636    }
2637    Scan scan = new Scan(get);
2638    if (scan.getLoadColumnFamiliesOnDemandValue() == null) {
2639      scan.setLoadColumnFamiliesOnDemand(region.isLoadingCfsOnDemandDefault());
2640    }
2641    RegionScannerImpl scanner = null;
2642    try {
2643      scanner = region.getScanner(scan);
2644      scanner.next(results);
2645    } finally {
2646      if (scanner != null) {
2647        if (closeCallBack == null) {
2648          // If there is a context then the scanner can be added to the current
2649          // RpcCallContext. The rpc callback will take care of closing the
2650          // scanner, for eg in case
2651          // of get()
2652          context.setCallBack(scanner);
2653        } else {
2654          // The call is from multi() where the results from the get() are
2655          // aggregated and then send out to the
2656          // rpc. The rpccall back will close all such scanners created as part
2657          // of multi().
2658          closeCallBack.addScanner(scanner);
2659        }
2660      }
2661    }
2662
2663    // post-get CP hook
2664    if (region.getCoprocessorHost() != null) {
2665      region.getCoprocessorHost().postGet(get, results);
2666    }
2667    region.metricsUpdateForGet(results, before);
2668
2669    return Result.create(results, get.isCheckExistenceOnly() ? !results.isEmpty() : null, stale);
2670  }
2671
2672  private void checkBatchSizeAndLogLargeSize(MultiRequest request) throws ServiceException {
2673    int sum = 0;
2674    String firstRegionName = null;
2675    for (RegionAction regionAction : request.getRegionActionList()) {
2676      if (sum == 0) {
2677        firstRegionName = Bytes.toStringBinary(regionAction.getRegion().getValue().toByteArray());
2678      }
2679      sum += regionAction.getActionCount();
2680    }
2681    if (sum > rowSizeWarnThreshold) {
2682      LOG.warn("Large batch operation detected (greater than " + rowSizeWarnThreshold
2683        + ") (HBASE-18023)." + " Requested Number of Rows: " + sum + " Client: "
2684        + RpcServer.getRequestUserName().orElse(null) + "/"
2685        + RpcServer.getRemoteAddress().orElse(null) + " first region in multi=" + firstRegionName);
2686      if (rejectRowsWithSizeOverThreshold) {
2687        throw new ServiceException(
2688          "Rejecting large batch operation for current batch with firstRegionName: "
2689            + firstRegionName + " , Requested Number of Rows: " + sum + " , Size Threshold: "
2690            + rowSizeWarnThreshold);
2691      }
2692    }
2693  }
2694
2695  private void failRegionAction(MultiResponse.Builder responseBuilder,
2696    RegionActionResult.Builder regionActionResultBuilder, RegionAction regionAction,
2697    CellScanner cellScanner, Throwable error) {
2698    rpcServer.getMetrics().exception(error);
2699    regionActionResultBuilder.setException(ResponseConverter.buildException(error));
2700    responseBuilder.addRegionActionResult(regionActionResultBuilder.build());
2701    // All Mutations in this RegionAction not executed as we can not see the Region online here
2702    // in this RS. Will be retried from Client. Skipping all the Cells in CellScanner
2703    // corresponding to these Mutations.
2704    if (cellScanner != null) {
2705      skipCellsForMutations(regionAction.getActionList(), cellScanner);
2706    }
2707  }
2708
2709  /**
2710   * Execute multiple actions on a table: get, mutate, and/or execCoprocessor
2711   * @param rpcc    the RPC controller
2712   * @param request the multi request n
2713   */
2714  @Override
2715  public MultiResponse multi(final RpcController rpcc, final MultiRequest request)
2716    throws ServiceException {
2717    try {
2718      checkOpen();
2719    } catch (IOException ie) {
2720      throw new ServiceException(ie);
2721    }
2722
2723    checkBatchSizeAndLogLargeSize(request);
2724
2725    // rpc controller is how we bring in data via the back door; it is unprotobuf'ed data.
2726    // It is also the conduit via which we pass back data.
2727    HBaseRpcController controller = (HBaseRpcController) rpcc;
2728    CellScanner cellScanner = controller != null ? controller.cellScanner() : null;
2729    if (controller != null) {
2730      controller.setCellScanner(null);
2731    }
2732
2733    long nonceGroup = request.hasNonceGroup() ? request.getNonceGroup() : HConstants.NO_NONCE;
2734
2735    MultiResponse.Builder responseBuilder = MultiResponse.newBuilder();
2736    RegionActionResult.Builder regionActionResultBuilder = RegionActionResult.newBuilder();
2737    this.rpcMultiRequestCount.increment();
2738    this.requestCount.increment();
2739    ActivePolicyEnforcement spaceQuotaEnforcement = getSpaceQuotaManager().getActiveEnforcements();
2740
2741    // We no longer use MultiRequest#condition. Instead, we use RegionAction#condition. The
2742    // following logic is for backward compatibility as old clients still use
2743    // MultiRequest#condition in case of checkAndMutate with RowMutations.
2744    if (request.hasCondition()) {
2745      if (request.getRegionActionList().isEmpty()) {
2746        // If the region action list is empty, do nothing.
2747        responseBuilder.setProcessed(true);
2748        return responseBuilder.build();
2749      }
2750
2751      RegionAction regionAction = request.getRegionAction(0);
2752
2753      // When request.hasCondition() is true, regionAction.getAtomic() should be always true. So
2754      // we can assume regionAction.getAtomic() is true here.
2755      assert regionAction.getAtomic();
2756
2757      OperationQuota quota;
2758      HRegion region;
2759      RegionSpecifier regionSpecifier = regionAction.getRegion();
2760
2761      try {
2762        region = getRegion(regionSpecifier);
2763        quota = getRpcQuotaManager().checkQuota(region, regionAction.getActionList());
2764      } catch (IOException e) {
2765        failRegionAction(responseBuilder, regionActionResultBuilder, regionAction, cellScanner, e);
2766        return responseBuilder.build();
2767      }
2768
2769      try {
2770        CheckAndMutateResult result = checkAndMutate(region, regionAction.getActionList(),
2771          cellScanner, request.getCondition(), nonceGroup, spaceQuotaEnforcement);
2772        responseBuilder.setProcessed(result.isSuccess());
2773        ClientProtos.ResultOrException.Builder resultOrExceptionOrBuilder =
2774          ClientProtos.ResultOrException.newBuilder();
2775        for (int i = 0; i < regionAction.getActionCount(); i++) {
2776          // To unify the response format with doNonAtomicRegionMutation and read through
2777          // client's AsyncProcess we have to add an empty result instance per operation
2778          resultOrExceptionOrBuilder.clear();
2779          resultOrExceptionOrBuilder.setIndex(i);
2780          regionActionResultBuilder.addResultOrException(resultOrExceptionOrBuilder.build());
2781        }
2782      } catch (IOException e) {
2783        rpcServer.getMetrics().exception(e);
2784        // As it's an atomic operation with a condition, we may expect it's a global failure.
2785        regionActionResultBuilder.setException(ResponseConverter.buildException(e));
2786      } finally {
2787        quota.close();
2788      }
2789
2790      responseBuilder.addRegionActionResult(regionActionResultBuilder.build());
2791      ClientProtos.RegionLoadStats regionLoadStats = region.getLoadStatistics();
2792      if (regionLoadStats != null) {
2793        responseBuilder.setRegionStatistics(MultiRegionLoadStats.newBuilder()
2794          .addRegion(regionSpecifier).addStat(regionLoadStats).build());
2795      }
2796      return responseBuilder.build();
2797    }
2798
2799    // this will contain all the cells that we need to return. It's created later, if needed.
2800    List<CellScannable> cellsToReturn = null;
2801    RegionScannersCloseCallBack closeCallBack = null;
2802    RpcCallContext context = RpcServer.getCurrentCall().orElse(null);
2803    Map<RegionSpecifier, ClientProtos.RegionLoadStats> regionStats =
2804      new HashMap<>(request.getRegionActionCount());
2805
2806    for (RegionAction regionAction : request.getRegionActionList()) {
2807      OperationQuota quota;
2808      HRegion region;
2809      RegionSpecifier regionSpecifier = regionAction.getRegion();
2810      regionActionResultBuilder.clear();
2811
2812      try {
2813        region = getRegion(regionSpecifier);
2814        quota = getRpcQuotaManager().checkQuota(region, regionAction.getActionList());
2815      } catch (IOException e) {
2816        failRegionAction(responseBuilder, regionActionResultBuilder, regionAction, cellScanner, e);
2817        continue; // For this region it's a failure.
2818      }
2819
2820      try {
2821        if (regionAction.hasCondition()) {
2822          try {
2823            ClientProtos.ResultOrException.Builder resultOrExceptionOrBuilder =
2824              ClientProtos.ResultOrException.newBuilder();
2825            if (regionAction.getActionCount() == 1) {
2826              CheckAndMutateResult result =
2827                checkAndMutate(region, quota, regionAction.getAction(0).getMutation(), cellScanner,
2828                  regionAction.getCondition(), nonceGroup, spaceQuotaEnforcement);
2829              regionActionResultBuilder.setProcessed(result.isSuccess());
2830              resultOrExceptionOrBuilder.setIndex(0);
2831              if (result.getResult() != null) {
2832                resultOrExceptionOrBuilder.setResult(ProtobufUtil.toResult(result.getResult()));
2833              }
2834              regionActionResultBuilder.addResultOrException(resultOrExceptionOrBuilder.build());
2835            } else {
2836              CheckAndMutateResult result = checkAndMutate(region, regionAction.getActionList(),
2837                cellScanner, regionAction.getCondition(), nonceGroup, spaceQuotaEnforcement);
2838              regionActionResultBuilder.setProcessed(result.isSuccess());
2839              for (int i = 0; i < regionAction.getActionCount(); i++) {
2840                if (i == 0 && result.getResult() != null) {
2841                  // Set the result of the Increment/Append operations to the first element of the
2842                  // ResultOrException list
2843                  resultOrExceptionOrBuilder.setIndex(i);
2844                  regionActionResultBuilder.addResultOrException(resultOrExceptionOrBuilder
2845                    .setResult(ProtobufUtil.toResult(result.getResult())).build());
2846                  continue;
2847                }
2848                // To unify the response format with doNonAtomicRegionMutation and read through
2849                // client's AsyncProcess we have to add an empty result instance per operation
2850                resultOrExceptionOrBuilder.clear();
2851                resultOrExceptionOrBuilder.setIndex(i);
2852                regionActionResultBuilder.addResultOrException(resultOrExceptionOrBuilder.build());
2853              }
2854            }
2855          } catch (IOException e) {
2856            rpcServer.getMetrics().exception(e);
2857            // As it's an atomic operation with a condition, we may expect it's a global failure.
2858            regionActionResultBuilder.setException(ResponseConverter.buildException(e));
2859          }
2860        } else if (regionAction.hasAtomic() && regionAction.getAtomic()) {
2861          try {
2862            doAtomicBatchOp(regionActionResultBuilder, region, quota, regionAction.getActionList(),
2863              cellScanner, nonceGroup, spaceQuotaEnforcement);
2864            regionActionResultBuilder.setProcessed(true);
2865            // We no longer use MultiResponse#processed. Instead, we use
2866            // RegionActionResult#processed. This is for backward compatibility for old clients.
2867            responseBuilder.setProcessed(true);
2868          } catch (IOException e) {
2869            rpcServer.getMetrics().exception(e);
2870            // As it's atomic, we may expect it's a global failure.
2871            regionActionResultBuilder.setException(ResponseConverter.buildException(e));
2872          }
2873        } else {
2874          // doNonAtomicRegionMutation manages the exception internally
2875          if (context != null && closeCallBack == null) {
2876            // An RpcCallBack that creates a list of scanners that needs to perform callBack
2877            // operation on completion of multiGets.
2878            // Set this only once
2879            closeCallBack = new RegionScannersCloseCallBack();
2880            context.setCallBack(closeCallBack);
2881          }
2882          cellsToReturn = doNonAtomicRegionMutation(region, quota, regionAction, cellScanner,
2883            regionActionResultBuilder, cellsToReturn, nonceGroup, closeCallBack, context,
2884            spaceQuotaEnforcement);
2885        }
2886      } finally {
2887        quota.close();
2888      }
2889
2890      responseBuilder.addRegionActionResult(regionActionResultBuilder.build());
2891      ClientProtos.RegionLoadStats regionLoadStats = region.getLoadStatistics();
2892      if (regionLoadStats != null) {
2893        regionStats.put(regionSpecifier, regionLoadStats);
2894      }
2895    }
2896    // Load the controller with the Cells to return.
2897    if (cellsToReturn != null && !cellsToReturn.isEmpty() && controller != null) {
2898      controller.setCellScanner(CellUtil.createCellScanner(cellsToReturn));
2899    }
2900
2901    MultiRegionLoadStats.Builder builder = MultiRegionLoadStats.newBuilder();
2902    for (Entry<RegionSpecifier, ClientProtos.RegionLoadStats> stat : regionStats.entrySet()) {
2903      builder.addRegion(stat.getKey());
2904      builder.addStat(stat.getValue());
2905    }
2906    responseBuilder.setRegionStatistics(builder);
2907    return responseBuilder.build();
2908  }
2909
2910  private void skipCellsForMutations(List<Action> actions, CellScanner cellScanner) {
2911    if (cellScanner == null) {
2912      return;
2913    }
2914    for (Action action : actions) {
2915      skipCellsForMutation(action, cellScanner);
2916    }
2917  }
2918
2919  private void skipCellsForMutation(Action action, CellScanner cellScanner) {
2920    if (cellScanner == null) {
2921      return;
2922    }
2923    try {
2924      if (action.hasMutation()) {
2925        MutationProto m = action.getMutation();
2926        if (m.hasAssociatedCellCount()) {
2927          for (int i = 0; i < m.getAssociatedCellCount(); i++) {
2928            cellScanner.advance();
2929          }
2930        }
2931      }
2932    } catch (IOException e) {
2933      // No need to handle these Individual Muatation level issue. Any way this entire RegionAction
2934      // marked as failed as we could not see the Region here. At client side the top level
2935      // RegionAction exception will be considered first.
2936      LOG.error("Error while skipping Cells in CellScanner for invalid Region Mutations", e);
2937    }
2938  }
2939
2940  /**
2941   * Mutate data in a table.
2942   * @param rpcc    the RPC controller
2943   * @param request the mutate request
2944   */
2945  @Override
2946  public MutateResponse mutate(final RpcController rpcc, final MutateRequest request)
2947    throws ServiceException {
2948    // rpc controller is how we bring in data via the back door; it is unprotobuf'ed data.
2949    // It is also the conduit via which we pass back data.
2950    HBaseRpcController controller = (HBaseRpcController) rpcc;
2951    CellScanner cellScanner = controller != null ? controller.cellScanner() : null;
2952    OperationQuota quota = null;
2953    RpcCallContext context = RpcServer.getCurrentCall().orElse(null);
2954    // Clear scanner so we are not holding on to reference across call.
2955    if (controller != null) {
2956      controller.setCellScanner(null);
2957    }
2958    try {
2959      checkOpen();
2960      requestCount.increment();
2961      rpcMutateRequestCount.increment();
2962      HRegion region = getRegion(request.getRegion());
2963      MutateResponse.Builder builder = MutateResponse.newBuilder();
2964      MutationProto mutation = request.getMutation();
2965      if (!region.getRegionInfo().isMetaRegion()) {
2966        regionServer.getMemStoreFlusher().reclaimMemStoreMemory();
2967      }
2968      long nonceGroup = request.hasNonceGroup() ? request.getNonceGroup() : HConstants.NO_NONCE;
2969      quota = getRpcQuotaManager().checkQuota(region, OperationQuota.OperationType.MUTATE);
2970      ActivePolicyEnforcement spaceQuotaEnforcement =
2971        getSpaceQuotaManager().getActiveEnforcements();
2972
2973      if (request.hasCondition()) {
2974        CheckAndMutateResult result = checkAndMutate(region, quota, mutation, cellScanner,
2975          request.getCondition(), nonceGroup, spaceQuotaEnforcement);
2976        builder.setProcessed(result.isSuccess());
2977        boolean clientCellBlockSupported = isClientCellBlockSupport(context);
2978        addResult(builder, result.getResult(), controller, clientCellBlockSupported);
2979        if (clientCellBlockSupported) {
2980          addSize(context, result.getResult(), null);
2981        }
2982      } else {
2983        Result r = null;
2984        Boolean processed = null;
2985        MutationType type = mutation.getMutateType();
2986        switch (type) {
2987          case APPEND:
2988            // TODO: this doesn't actually check anything.
2989            r = append(region, quota, mutation, cellScanner, nonceGroup, spaceQuotaEnforcement);
2990            break;
2991          case INCREMENT:
2992            // TODO: this doesn't actually check anything.
2993            r = increment(region, quota, mutation, cellScanner, nonceGroup, spaceQuotaEnforcement);
2994            break;
2995          case PUT:
2996            put(region, quota, mutation, cellScanner, spaceQuotaEnforcement);
2997            processed = Boolean.TRUE;
2998            break;
2999          case DELETE:
3000            delete(region, quota, mutation, cellScanner, spaceQuotaEnforcement);
3001            processed = Boolean.TRUE;
3002            break;
3003          default:
3004            throw new DoNotRetryIOException("Unsupported mutate type: " + type.name());
3005        }
3006        if (processed != null) {
3007          builder.setProcessed(processed);
3008        }
3009        boolean clientCellBlockSupported = isClientCellBlockSupport(context);
3010        addResult(builder, r, controller, clientCellBlockSupported);
3011        if (clientCellBlockSupported) {
3012          addSize(context, r, null);
3013        }
3014      }
3015      return builder.build();
3016    } catch (IOException ie) {
3017      regionServer.checkFileSystem();
3018      throw new ServiceException(ie);
3019    } finally {
3020      if (quota != null) {
3021        quota.close();
3022      }
3023    }
3024  }
3025
3026  private void put(HRegion region, OperationQuota quota, MutationProto mutation,
3027    CellScanner cellScanner, ActivePolicyEnforcement spaceQuota) throws IOException {
3028    long before = EnvironmentEdgeManager.currentTime();
3029    Put put = ProtobufUtil.toPut(mutation, cellScanner);
3030    checkCellSizeLimit(region, put);
3031    spaceQuota.getPolicyEnforcement(region).check(put);
3032    quota.addMutation(put);
3033    region.put(put);
3034
3035    MetricsRegionServer metricsRegionServer = regionServer.getMetrics();
3036    if (metricsRegionServer != null) {
3037      long after = EnvironmentEdgeManager.currentTime();
3038      metricsRegionServer.updatePut(region.getRegionInfo().getTable(), after - before);
3039    }
3040  }
3041
3042  private void delete(HRegion region, OperationQuota quota, MutationProto mutation,
3043    CellScanner cellScanner, ActivePolicyEnforcement spaceQuota) throws IOException {
3044    long before = EnvironmentEdgeManager.currentTime();
3045    Delete delete = ProtobufUtil.toDelete(mutation, cellScanner);
3046    checkCellSizeLimit(region, delete);
3047    spaceQuota.getPolicyEnforcement(region).check(delete);
3048    quota.addMutation(delete);
3049    region.delete(delete);
3050
3051    MetricsRegionServer metricsRegionServer = regionServer.getMetrics();
3052    if (metricsRegionServer != null) {
3053      long after = EnvironmentEdgeManager.currentTime();
3054      metricsRegionServer.updateDelete(region.getRegionInfo().getTable(), after - before);
3055    }
3056  }
3057
3058  private CheckAndMutateResult checkAndMutate(HRegion region, OperationQuota quota,
3059    MutationProto mutation, CellScanner cellScanner, Condition condition, long nonceGroup,
3060    ActivePolicyEnforcement spaceQuota) throws IOException {
3061    long before = EnvironmentEdgeManager.currentTime();
3062    CheckAndMutate checkAndMutate = ProtobufUtil.toCheckAndMutate(condition, mutation, cellScanner);
3063    long nonce = mutation.hasNonce() ? mutation.getNonce() : HConstants.NO_NONCE;
3064    checkCellSizeLimit(region, (Mutation) checkAndMutate.getAction());
3065    spaceQuota.getPolicyEnforcement(region).check((Mutation) checkAndMutate.getAction());
3066    quota.addMutation((Mutation) checkAndMutate.getAction());
3067
3068    CheckAndMutateResult result = null;
3069    if (region.getCoprocessorHost() != null) {
3070      result = region.getCoprocessorHost().preCheckAndMutate(checkAndMutate);
3071    }
3072    if (result == null) {
3073      result = region.checkAndMutate(checkAndMutate, nonceGroup, nonce);
3074      if (region.getCoprocessorHost() != null) {
3075        result = region.getCoprocessorHost().postCheckAndMutate(checkAndMutate, result);
3076      }
3077    }
3078    MetricsRegionServer metricsRegionServer = regionServer.getMetrics();
3079    if (metricsRegionServer != null) {
3080      long after = EnvironmentEdgeManager.currentTime();
3081      metricsRegionServer.updateCheckAndMutate(region.getRegionInfo().getTable(), after - before);
3082
3083      MutationType type = mutation.getMutateType();
3084      switch (type) {
3085        case PUT:
3086          metricsRegionServer.updateCheckAndPut(region.getRegionInfo().getTable(), after - before);
3087          break;
3088        case DELETE:
3089          metricsRegionServer.updateCheckAndDelete(region.getRegionInfo().getTable(),
3090            after - before);
3091          break;
3092        default:
3093          break;
3094      }
3095    }
3096    return result;
3097  }
3098
3099  // This is used to keep compatible with the old client implementation. Consider remove it if we
3100  // decide to drop the support of the client that still sends close request to a region scanner
3101  // which has already been exhausted.
3102  @Deprecated
3103  private static final IOException SCANNER_ALREADY_CLOSED = new IOException() {
3104
3105    private static final long serialVersionUID = -4305297078988180130L;
3106
3107    @Override
3108    public synchronized Throwable fillInStackTrace() {
3109      return this;
3110    }
3111  };
3112
3113  private RegionScannerHolder getRegionScanner(ScanRequest request) throws IOException {
3114    String scannerName = toScannerName(request.getScannerId());
3115    RegionScannerHolder rsh = this.scanners.get(scannerName);
3116    if (rsh == null) {
3117      // just ignore the next or close request if scanner does not exists.
3118      if (closedScanners.getIfPresent(scannerName) != null) {
3119        throw SCANNER_ALREADY_CLOSED;
3120      } else {
3121        LOG.warn("Client tried to access missing scanner " + scannerName);
3122        throw new UnknownScannerException(
3123          "Unknown scanner '" + scannerName + "'. This can happen due to any of the following "
3124            + "reasons: a) Scanner id given is wrong, b) Scanner lease expired because of "
3125            + "long wait between consecutive client checkins, c) Server may be closing down, "
3126            + "d) RegionServer restart during upgrade.\nIf the issue is due to reason (b), a "
3127            + "possible fix would be increasing the value of"
3128            + "'hbase.client.scanner.timeout.period' configuration.");
3129      }
3130    }
3131    RegionInfo hri = rsh.s.getRegionInfo();
3132    // Yes, should be the same instance
3133    if (regionServer.getOnlineRegion(hri.getRegionName()) != rsh.r) {
3134      String msg = "Region has changed on the scanner " + scannerName + ": regionName="
3135        + hri.getRegionNameAsString() + ", scannerRegionName=" + rsh.r;
3136      LOG.warn(msg + ", closing...");
3137      scanners.remove(scannerName);
3138      try {
3139        rsh.s.close();
3140      } catch (IOException e) {
3141        LOG.warn("Getting exception closing " + scannerName, e);
3142      } finally {
3143        try {
3144          regionServer.getLeaseManager().cancelLease(scannerName);
3145        } catch (LeaseException e) {
3146          LOG.warn("Getting exception closing " + scannerName, e);
3147        }
3148      }
3149      throw new NotServingRegionException(msg);
3150    }
3151    return rsh;
3152  }
3153
3154  /**
3155   * @return Pair with scannerName key to use with this new Scanner and its RegionScannerHolder
3156   *         value.
3157   */
3158  private Pair<String, RegionScannerHolder> newRegionScanner(ScanRequest request,
3159    ScanResponse.Builder builder) throws IOException {
3160    HRegion region = getRegion(request.getRegion());
3161    ClientProtos.Scan protoScan = request.getScan();
3162    boolean isLoadingCfsOnDemandSet = protoScan.hasLoadColumnFamiliesOnDemand();
3163    Scan scan = ProtobufUtil.toScan(protoScan);
3164    // if the request doesn't set this, get the default region setting.
3165    if (!isLoadingCfsOnDemandSet) {
3166      scan.setLoadColumnFamiliesOnDemand(region.isLoadingCfsOnDemandDefault());
3167    }
3168
3169    if (!scan.hasFamilies()) {
3170      // Adding all families to scanner
3171      for (byte[] family : region.getTableDescriptor().getColumnFamilyNames()) {
3172        scan.addFamily(family);
3173      }
3174    }
3175    if (region.getCoprocessorHost() != null) {
3176      // preScannerOpen is not allowed to return a RegionScanner. Only post hook can create a
3177      // wrapper for the core created RegionScanner
3178      region.getCoprocessorHost().preScannerOpen(scan);
3179    }
3180    RegionScannerImpl coreScanner = region.getScanner(scan);
3181    Shipper shipper = coreScanner;
3182    RegionScanner scanner = coreScanner;
3183    try {
3184      if (region.getCoprocessorHost() != null) {
3185        scanner = region.getCoprocessorHost().postScannerOpen(scan, scanner);
3186      }
3187    } catch (Exception e) {
3188      // Although region coprocessor is for advanced users and they should take care of the
3189      // implementation to not damage the HBase system, closing the scanner on exception here does
3190      // not have any bad side effect, so let's do it
3191      scanner.close();
3192      throw e;
3193    }
3194    long scannerId = scannerIdGenerator.generateNewScannerId();
3195    builder.setScannerId(scannerId);
3196    builder.setMvccReadPoint(scanner.getMvccReadPoint());
3197    builder.setTtl(scannerLeaseTimeoutPeriod);
3198    String scannerName = toScannerName(scannerId);
3199
3200    boolean fullRegionScan =
3201      !region.getRegionInfo().getTable().isSystemTable() && isFullRegionScan(scan, region);
3202
3203    return new Pair<String, RegionScannerHolder>(scannerName,
3204      addScanner(scannerName, scanner, shipper, region, scan.isNeedCursorResult(), fullRegionScan));
3205  }
3206
3207  /**
3208   * The returned String is used as key doing look up of outstanding Scanners in this Servers'
3209   * this.scanners, the Map of outstanding scanners and their current state.
3210   * @param scannerId A scanner long id.
3211   * @return The long id as a String.
3212   */
3213  private static String toScannerName(long scannerId) {
3214    return Long.toString(scannerId);
3215  }
3216
3217  private void checkScanNextCallSeq(ScanRequest request, RegionScannerHolder rsh)
3218    throws OutOfOrderScannerNextException {
3219    // if nextCallSeq does not match throw Exception straight away. This needs to be
3220    // performed even before checking of Lease.
3221    // See HBASE-5974
3222    if (request.hasNextCallSeq()) {
3223      long callSeq = request.getNextCallSeq();
3224      if (!rsh.incNextCallSeq(callSeq)) {
3225        throw new OutOfOrderScannerNextException(
3226          "Expected nextCallSeq: " + rsh.getNextCallSeq() + " But the nextCallSeq got from client: "
3227            + request.getNextCallSeq() + "; request=" + TextFormat.shortDebugString(request));
3228      }
3229    }
3230  }
3231
3232  private void addScannerLeaseBack(LeaseManager.Lease lease) {
3233    try {
3234      regionServer.getLeaseManager().addLease(lease);
3235    } catch (LeaseStillHeldException e) {
3236      // should not happen as the scanner id is unique.
3237      throw new AssertionError(e);
3238    }
3239  }
3240
3241  // visible for testing only
3242  long getTimeLimit(RpcCall rpcCall, HBaseRpcController controller,
3243    boolean allowHeartbeatMessages) {
3244    // Set the time limit to be half of the more restrictive timeout value (one of the
3245    // timeout values must be positive). In the event that both values are positive, the
3246    // more restrictive of the two is used to calculate the limit.
3247    if (allowHeartbeatMessages) {
3248      long now = EnvironmentEdgeManager.currentTime();
3249      long remainingTimeout = getRemainingRpcTimeout(rpcCall, controller, now);
3250      if (scannerLeaseTimeoutPeriod > 0 || remainingTimeout > 0) {
3251        long timeLimitDelta;
3252        if (scannerLeaseTimeoutPeriod > 0 && remainingTimeout > 0) {
3253          timeLimitDelta = Math.min(scannerLeaseTimeoutPeriod, remainingTimeout);
3254        } else {
3255          timeLimitDelta =
3256            scannerLeaseTimeoutPeriod > 0 ? scannerLeaseTimeoutPeriod : remainingTimeout;
3257        }
3258
3259        // Use half of whichever timeout value was more restrictive... But don't allow
3260        // the time limit to be less than the allowable minimum (could cause an
3261        // immediate timeout before scanning any data).
3262        timeLimitDelta = Math.max(timeLimitDelta / 2, minimumScanTimeLimitDelta);
3263        return now + timeLimitDelta;
3264      }
3265    }
3266    // Default value of timeLimit is negative to indicate no timeLimit should be
3267    // enforced.
3268    return -1L;
3269  }
3270
3271  private long getRemainingRpcTimeout(RpcCall call, HBaseRpcController controller, long now) {
3272    long timeout;
3273    if (controller != null && controller.getCallTimeout() > 0) {
3274      timeout = controller.getCallTimeout();
3275    } else if (rpcTimeout > 0) {
3276      timeout = rpcTimeout;
3277    } else {
3278      return -1;
3279    }
3280    if (call != null) {
3281      timeout -= (now - call.getReceiveTime());
3282    }
3283    // getTimeLimit ignores values <= 0, but timeout may now be negative if queue time was high.
3284    // return minimum value here in that case so we count this in calculating the final delta.
3285    return Math.max(minimumScanTimeLimitDelta, timeout);
3286  }
3287
3288  private void checkLimitOfRows(int numOfCompleteRows, int limitOfRows, boolean moreRows,
3289    ScannerContext scannerContext, ScanResponse.Builder builder) {
3290    if (numOfCompleteRows >= limitOfRows) {
3291      if (LOG.isTraceEnabled()) {
3292        LOG.trace("Done scanning, limit of rows reached, moreRows: " + moreRows
3293          + " scannerContext: " + scannerContext);
3294      }
3295      builder.setMoreResults(false);
3296    }
3297  }
3298
3299  // return whether we have more results in region.
3300  private void scan(HBaseRpcController controller, ScanRequest request, RegionScannerHolder rsh,
3301    long maxQuotaResultSize, int maxResults, int limitOfRows, List<Result> results,
3302    ScanResponse.Builder builder, MutableObject<Object> lastBlock, RpcCall rpcCall)
3303    throws IOException {
3304    HRegion region = rsh.r;
3305    RegionScanner scanner = rsh.s;
3306    long maxResultSize;
3307    if (scanner.getMaxResultSize() > 0) {
3308      maxResultSize = Math.min(scanner.getMaxResultSize(), maxQuotaResultSize);
3309    } else {
3310      maxResultSize = maxQuotaResultSize;
3311    }
3312    // This is cells inside a row. Default size is 10 so if many versions or many cfs,
3313    // then we'll resize. Resizings show in profiler. Set it higher than 10. For now
3314    // arbitrary 32. TODO: keep record of general size of results being returned.
3315    ArrayList<Cell> values = new ArrayList<>(32);
3316    region.startRegionOperation(Operation.SCAN);
3317    long before = EnvironmentEdgeManager.currentTime();
3318    // Used to check if we've matched the row limit set on the Scan
3319    int numOfCompleteRows = 0;
3320    // Count of times we call nextRaw; can be > numOfCompleteRows.
3321    int numOfNextRawCalls = 0;
3322    try {
3323      int numOfResults = 0;
3324      synchronized (scanner) {
3325        boolean stale = (region.getRegionInfo().getReplicaId() != 0);
3326        boolean clientHandlesPartials =
3327          request.hasClientHandlesPartials() && request.getClientHandlesPartials();
3328        boolean clientHandlesHeartbeats =
3329          request.hasClientHandlesHeartbeats() && request.getClientHandlesHeartbeats();
3330
3331        // On the server side we must ensure that the correct ordering of partial results is
3332        // returned to the client to allow them to properly reconstruct the partial results.
3333        // If the coprocessor host is adding to the result list, we cannot guarantee the
3334        // correct ordering of partial results and so we prevent partial results from being
3335        // formed.
3336        boolean serverGuaranteesOrderOfPartials = results.isEmpty();
3337        boolean allowPartialResults = clientHandlesPartials && serverGuaranteesOrderOfPartials;
3338        boolean moreRows = false;
3339
3340        // Heartbeat messages occur when the processing of the ScanRequest is exceeds a
3341        // certain time threshold on the server. When the time threshold is exceeded, the
3342        // server stops the scan and sends back whatever Results it has accumulated within
3343        // that time period (may be empty). Since heartbeat messages have the potential to
3344        // create partial Results (in the event that the timeout occurs in the middle of a
3345        // row), we must only generate heartbeat messages when the client can handle both
3346        // heartbeats AND partials
3347        boolean allowHeartbeatMessages = clientHandlesHeartbeats && allowPartialResults;
3348
3349        long timeLimit = getTimeLimit(rpcCall, controller, allowHeartbeatMessages);
3350
3351        final LimitScope sizeScope =
3352          allowPartialResults ? LimitScope.BETWEEN_CELLS : LimitScope.BETWEEN_ROWS;
3353        final LimitScope timeScope =
3354          allowHeartbeatMessages ? LimitScope.BETWEEN_CELLS : LimitScope.BETWEEN_ROWS;
3355
3356        boolean trackMetrics = request.hasTrackScanMetrics() && request.getTrackScanMetrics();
3357
3358        // Configure with limits for this RPC. Set keep progress true since size progress
3359        // towards size limit should be kept between calls to nextRaw
3360        ScannerContext.Builder contextBuilder = ScannerContext.newBuilder(true);
3361        // maxResultSize - either we can reach this much size for all cells(being read) data or sum
3362        // of heap size occupied by cells(being read). Cell data means its key and value parts.
3363        contextBuilder.setSizeLimit(sizeScope, maxResultSize, maxResultSize);
3364        contextBuilder.setBatchLimit(scanner.getBatch());
3365        contextBuilder.setTimeLimit(timeScope, timeLimit);
3366        contextBuilder.setTrackMetrics(trackMetrics);
3367        ScannerContext scannerContext = contextBuilder.build();
3368        boolean limitReached = false;
3369        while (numOfResults < maxResults) {
3370          // Reset the batch progress to 0 before every call to RegionScanner#nextRaw. The
3371          // batch limit is a limit on the number of cells per Result. Thus, if progress is
3372          // being tracked (i.e. scannerContext.keepProgress() is true) then we need to
3373          // reset the batch progress between nextRaw invocations since we don't want the
3374          // batch progress from previous calls to affect future calls
3375          scannerContext.setBatchProgress(0);
3376          assert values.isEmpty();
3377
3378          // Collect values to be returned here
3379          moreRows = scanner.nextRaw(values, scannerContext);
3380          if (rpcCall == null) {
3381            // When there is no RpcCallContext,copy EC to heap, then the scanner would close,
3382            // This can be an EXPENSIVE call. It may make an extra copy from offheap to onheap
3383            // buffers.See more details in HBASE-26036.
3384            CellUtil.cloneIfNecessary(values);
3385          }
3386          numOfNextRawCalls++;
3387
3388          if (!values.isEmpty()) {
3389            if (limitOfRows > 0) {
3390              // First we need to check if the last result is partial and we have a row change. If
3391              // so then we need to increase the numOfCompleteRows.
3392              if (results.isEmpty()) {
3393                if (
3394                  rsh.rowOfLastPartialResult != null
3395                    && !CellUtil.matchingRows(values.get(0), rsh.rowOfLastPartialResult)
3396                ) {
3397                  numOfCompleteRows++;
3398                  checkLimitOfRows(numOfCompleteRows, limitOfRows, moreRows, scannerContext,
3399                    builder);
3400                }
3401              } else {
3402                Result lastResult = results.get(results.size() - 1);
3403                if (
3404                  lastResult.mayHaveMoreCellsInRow()
3405                    && !CellUtil.matchingRows(values.get(0), lastResult.getRow())
3406                ) {
3407                  numOfCompleteRows++;
3408                  checkLimitOfRows(numOfCompleteRows, limitOfRows, moreRows, scannerContext,
3409                    builder);
3410                }
3411              }
3412              if (builder.hasMoreResults() && !builder.getMoreResults()) {
3413                break;
3414              }
3415            }
3416            boolean mayHaveMoreCellsInRow = scannerContext.mayHaveMoreCellsInRow();
3417            Result r = Result.create(values, null, stale, mayHaveMoreCellsInRow);
3418            lastBlock.setValue(addSize(rpcCall, r, lastBlock.getValue()));
3419            results.add(r);
3420            numOfResults++;
3421            if (!mayHaveMoreCellsInRow && limitOfRows > 0) {
3422              numOfCompleteRows++;
3423              checkLimitOfRows(numOfCompleteRows, limitOfRows, moreRows, scannerContext, builder);
3424              if (builder.hasMoreResults() && !builder.getMoreResults()) {
3425                break;
3426              }
3427            }
3428          } else if (!moreRows && !results.isEmpty()) {
3429            // No more cells for the scan here, we need to ensure that the mayHaveMoreCellsInRow of
3430            // last result is false. Otherwise it's possible that: the first nextRaw returned
3431            // because BATCH_LIMIT_REACHED (BTW it happen to exhaust all cells of the scan),so the
3432            // last result's mayHaveMoreCellsInRow will be true. while the following nextRaw will
3433            // return with moreRows=false, which means moreResultsInRegion would be false, it will
3434            // be a contradictory state (HBASE-21206).
3435            int lastIdx = results.size() - 1;
3436            Result r = results.get(lastIdx);
3437            if (r.mayHaveMoreCellsInRow()) {
3438              results.set(lastIdx, Result.create(r.rawCells(), r.getExists(), r.isStale(), false));
3439            }
3440          }
3441          boolean sizeLimitReached = scannerContext.checkSizeLimit(LimitScope.BETWEEN_ROWS);
3442          boolean timeLimitReached = scannerContext.checkTimeLimit(LimitScope.BETWEEN_ROWS);
3443          boolean resultsLimitReached = numOfResults >= maxResults;
3444          limitReached = sizeLimitReached || timeLimitReached || resultsLimitReached;
3445
3446          if (limitReached || !moreRows) {
3447            // We only want to mark a ScanResponse as a heartbeat message in the event that
3448            // there are more values to be read server side. If there aren't more values,
3449            // marking it as a heartbeat is wasteful because the client will need to issue
3450            // another ScanRequest only to realize that they already have all the values
3451            if (moreRows && timeLimitReached) {
3452              // Heartbeat messages occur when the time limit has been reached.
3453              builder.setHeartbeatMessage(true);
3454              if (rsh.needCursor) {
3455                Cell cursorCell = scannerContext.getLastPeekedCell();
3456                if (cursorCell != null) {
3457                  builder.setCursor(ProtobufUtil.toCursor(cursorCell));
3458                }
3459              }
3460            }
3461            break;
3462          }
3463          values.clear();
3464        }
3465        builder.setMoreResultsInRegion(moreRows);
3466        // Check to see if the client requested that we track metrics server side. If the
3467        // client requested metrics, retrieve the metrics from the scanner context.
3468        if (trackMetrics) {
3469          Map<String, Long> metrics = scannerContext.getMetrics().getMetricsMap();
3470          ScanMetrics.Builder metricBuilder = ScanMetrics.newBuilder();
3471          NameInt64Pair.Builder pairBuilder = NameInt64Pair.newBuilder();
3472
3473          for (Entry<String, Long> entry : metrics.entrySet()) {
3474            pairBuilder.setName(entry.getKey());
3475            pairBuilder.setValue(entry.getValue());
3476            metricBuilder.addMetrics(pairBuilder.build());
3477          }
3478
3479          builder.setScanMetrics(metricBuilder.build());
3480        }
3481      }
3482    } finally {
3483      region.closeRegionOperation();
3484      // Update serverside metrics, even on error.
3485      long end = EnvironmentEdgeManager.currentTime();
3486      long responseCellSize = rpcCall != null ? rpcCall.getResponseCellSize() : 0;
3487      region.getMetrics().updateScanTime(end - before);
3488      final MetricsRegionServer metricsRegionServer = regionServer.getMetrics();
3489      if (metricsRegionServer != null) {
3490        metricsRegionServer.updateScanSize(region.getTableDescriptor().getTableName(),
3491          responseCellSize);
3492        metricsRegionServer.updateScanTime(region.getTableDescriptor().getTableName(),
3493          end - before);
3494        metricsRegionServer.updateReadQueryMeter(region.getRegionInfo().getTable(),
3495          numOfNextRawCalls);
3496      }
3497    }
3498    // coprocessor postNext hook
3499    if (region.getCoprocessorHost() != null) {
3500      region.getCoprocessorHost().postScannerNext(scanner, results, maxResults, true);
3501    }
3502  }
3503
3504  /**
3505   * Scan data in a table.
3506   * @param controller the RPC controller
3507   * @param request    the scan request n
3508   */
3509  @Override
3510  public ScanResponse scan(final RpcController controller, final ScanRequest request)
3511    throws ServiceException {
3512    if (controller != null && !(controller instanceof HBaseRpcController)) {
3513      throw new UnsupportedOperationException(
3514        "We only do " + "HBaseRpcControllers! FIX IF A PROBLEM: " + controller);
3515    }
3516    if (!request.hasScannerId() && !request.hasScan()) {
3517      throw new ServiceException(
3518        new DoNotRetryIOException("Missing required input: scannerId or scan"));
3519    }
3520    try {
3521      checkOpen();
3522    } catch (IOException e) {
3523      if (request.hasScannerId()) {
3524        String scannerName = toScannerName(request.getScannerId());
3525        if (LOG.isDebugEnabled()) {
3526          LOG.debug(
3527            "Server shutting down and client tried to access missing scanner " + scannerName);
3528        }
3529        final LeaseManager leaseManager = regionServer.getLeaseManager();
3530        if (leaseManager != null) {
3531          try {
3532            leaseManager.cancelLease(scannerName);
3533          } catch (LeaseException le) {
3534            // No problem, ignore
3535            if (LOG.isTraceEnabled()) {
3536              LOG.trace("Un-able to cancel lease of scanner. It could already be closed.");
3537            }
3538          }
3539        }
3540      }
3541      throw new ServiceException(e);
3542    }
3543    requestCount.increment();
3544    rpcScanRequestCount.increment();
3545    RegionScannerHolder rsh;
3546    ScanResponse.Builder builder = ScanResponse.newBuilder();
3547    String scannerName;
3548    try {
3549      if (request.hasScannerId()) {
3550        // The downstream projects such as AsyncHBase in OpenTSDB need this value. See HBASE-18000
3551        // for more details.
3552        long scannerId = request.getScannerId();
3553        builder.setScannerId(scannerId);
3554        scannerName = toScannerName(scannerId);
3555        rsh = getRegionScanner(request);
3556      } else {
3557        Pair<String, RegionScannerHolder> scannerNameAndRSH = newRegionScanner(request, builder);
3558        scannerName = scannerNameAndRSH.getFirst();
3559        rsh = scannerNameAndRSH.getSecond();
3560      }
3561    } catch (IOException e) {
3562      if (e == SCANNER_ALREADY_CLOSED) {
3563        // Now we will close scanner automatically if there are no more results for this region but
3564        // the old client will still send a close request to us. Just ignore it and return.
3565        return builder.build();
3566      }
3567      throw new ServiceException(e);
3568    }
3569    if (rsh.fullRegionScan) {
3570      rpcFullScanRequestCount.increment();
3571    }
3572    HRegion region = rsh.r;
3573    LeaseManager.Lease lease;
3574    try {
3575      // Remove lease while its being processed in server; protects against case
3576      // where processing of request takes > lease expiration time. or null if none found.
3577      lease = regionServer.getLeaseManager().removeLease(scannerName);
3578    } catch (LeaseException e) {
3579      throw new ServiceException(e);
3580    }
3581    if (request.hasRenew() && request.getRenew()) {
3582      // add back and return
3583      addScannerLeaseBack(lease);
3584      try {
3585        checkScanNextCallSeq(request, rsh);
3586      } catch (OutOfOrderScannerNextException e) {
3587        throw new ServiceException(e);
3588      }
3589      return builder.build();
3590    }
3591    OperationQuota quota;
3592    try {
3593      quota = getRpcQuotaManager().checkQuota(region, OperationQuota.OperationType.SCAN);
3594    } catch (IOException e) {
3595      addScannerLeaseBack(lease);
3596      throw new ServiceException(e);
3597    }
3598    try {
3599      checkScanNextCallSeq(request, rsh);
3600    } catch (OutOfOrderScannerNextException e) {
3601      addScannerLeaseBack(lease);
3602      throw new ServiceException(e);
3603    }
3604    // Now we have increased the next call sequence. If we give client an error, the retry will
3605    // never success. So we'd better close the scanner and return a DoNotRetryIOException to client
3606    // and then client will try to open a new scanner.
3607    boolean closeScanner = request.hasCloseScanner() ? request.getCloseScanner() : false;
3608    int rows; // this is scan.getCaching
3609    if (request.hasNumberOfRows()) {
3610      rows = request.getNumberOfRows();
3611    } else {
3612      rows = closeScanner ? 0 : 1;
3613    }
3614    RpcCall rpcCall = RpcServer.getCurrentCall().orElse(null);
3615    // now let's do the real scan.
3616    long maxQuotaResultSize = Math.min(maxScannerResultSize, quota.getReadAvailable());
3617    RegionScanner scanner = rsh.s;
3618    // this is the limit of rows for this scan, if we the number of rows reach this value, we will
3619    // close the scanner.
3620    int limitOfRows;
3621    if (request.hasLimitOfRows()) {
3622      limitOfRows = request.getLimitOfRows();
3623    } else {
3624      limitOfRows = -1;
3625    }
3626    MutableObject<Object> lastBlock = new MutableObject<>();
3627    boolean scannerClosed = false;
3628    try {
3629      List<Result> results = new ArrayList<>(Math.min(rows, 512));
3630      if (rows > 0) {
3631        boolean done = false;
3632        // Call coprocessor. Get region info from scanner.
3633        if (region.getCoprocessorHost() != null) {
3634          Boolean bypass = region.getCoprocessorHost().preScannerNext(scanner, results, rows);
3635          if (!results.isEmpty()) {
3636            for (Result r : results) {
3637              lastBlock.setValue(addSize(rpcCall, r, lastBlock.getValue()));
3638            }
3639          }
3640          if (bypass != null && bypass.booleanValue()) {
3641            done = true;
3642          }
3643        }
3644        if (!done) {
3645          scan((HBaseRpcController) controller, request, rsh, maxQuotaResultSize, rows, limitOfRows,
3646            results, builder, lastBlock, rpcCall);
3647        } else {
3648          builder.setMoreResultsInRegion(!results.isEmpty());
3649        }
3650      } else {
3651        // This is a open scanner call with numberOfRow = 0, so set more results in region to true.
3652        builder.setMoreResultsInRegion(true);
3653      }
3654
3655      quota.addScanResult(results);
3656      addResults(builder, results, (HBaseRpcController) controller,
3657        RegionReplicaUtil.isDefaultReplica(region.getRegionInfo()),
3658        isClientCellBlockSupport(rpcCall));
3659      if (scanner.isFilterDone() && results.isEmpty()) {
3660        // If the scanner's filter - if any - is done with the scan
3661        // only set moreResults to false if the results is empty. This is used to keep compatible
3662        // with the old scan implementation where we just ignore the returned results if moreResults
3663        // is false. Can remove the isEmpty check after we get rid of the old implementation.
3664        builder.setMoreResults(false);
3665      }
3666      // Later we may close the scanner depending on this flag so here we need to make sure that we
3667      // have already set this flag.
3668      assert builder.hasMoreResultsInRegion();
3669      // we only set moreResults to false in the above code, so set it to true if we haven't set it
3670      // yet.
3671      if (!builder.hasMoreResults()) {
3672        builder.setMoreResults(true);
3673      }
3674      if (builder.getMoreResults() && builder.getMoreResultsInRegion() && !results.isEmpty()) {
3675        // Record the last cell of the last result if it is a partial result
3676        // We need this to calculate the complete rows we have returned to client as the
3677        // mayHaveMoreCellsInRow is true does not mean that there will be extra cells for the
3678        // current row. We may filter out all the remaining cells for the current row and just
3679        // return the cells of the nextRow when calling RegionScanner.nextRaw. So here we need to
3680        // check for row change.
3681        Result lastResult = results.get(results.size() - 1);
3682        if (lastResult.mayHaveMoreCellsInRow()) {
3683          rsh.rowOfLastPartialResult = lastResult.getRow();
3684        } else {
3685          rsh.rowOfLastPartialResult = null;
3686        }
3687      }
3688      if (!builder.getMoreResults() || !builder.getMoreResultsInRegion() || closeScanner) {
3689        scannerClosed = true;
3690        closeScanner(region, scanner, scannerName, rpcCall);
3691      }
3692
3693      // There's no point returning to a timed out client. Throwing ensures scanner is closed
3694      if (rpcCall != null && EnvironmentEdgeManager.currentTime() > rpcCall.getDeadline()) {
3695        throw new TimeoutIOException("Client deadline exceeded, cannot return results");
3696      }
3697
3698      return builder.build();
3699    } catch (IOException e) {
3700      try {
3701        // scanner is closed here
3702        scannerClosed = true;
3703        // The scanner state might be left in a dirty state, so we will tell the Client to
3704        // fail this RPC and close the scanner while opening up another one from the start of
3705        // row that the client has last seen.
3706        closeScanner(region, scanner, scannerName, rpcCall);
3707
3708        // If it is a DoNotRetryIOException already, throw as it is. Unfortunately, DNRIOE is
3709        // used in two different semantics.
3710        // (1) The first is to close the client scanner and bubble up the exception all the way
3711        // to the application. This is preferred when the exception is really un-recoverable
3712        // (like CorruptHFileException, etc). Plain DoNotRetryIOException also falls into this
3713        // bucket usually.
3714        // (2) Second semantics is to close the current region scanner only, but continue the
3715        // client scanner by overriding the exception. This is usually UnknownScannerException,
3716        // OutOfOrderScannerNextException, etc where the region scanner has to be closed, but the
3717        // application-level ClientScanner has to continue without bubbling up the exception to
3718        // the client. See ClientScanner code to see how it deals with these special exceptions.
3719        if (e instanceof DoNotRetryIOException) {
3720          throw e;
3721        }
3722
3723        // If it is a FileNotFoundException, wrap as a
3724        // DoNotRetryIOException. This can avoid the retry in ClientScanner.
3725        if (e instanceof FileNotFoundException) {
3726          throw new DoNotRetryIOException(e);
3727        }
3728
3729        // We closed the scanner already. Instead of throwing the IOException, and client
3730        // retrying with the same scannerId only to get USE on the next RPC, we directly throw
3731        // a special exception to save an RPC.
3732        if (VersionInfoUtil.hasMinimumVersion(rpcCall.getClientVersionInfo(), 1, 4)) {
3733          // 1.4.0+ clients know how to handle
3734          throw new ScannerResetException("Scanner is closed on the server-side", e);
3735        } else {
3736          // older clients do not know about SRE. Just throw USE, which they will handle
3737          throw new UnknownScannerException("Throwing UnknownScannerException to reset the client"
3738            + " scanner state for clients older than 1.3.", e);
3739        }
3740      } catch (IOException ioe) {
3741        throw new ServiceException(ioe);
3742      }
3743    } finally {
3744      if (!scannerClosed) {
3745        // Adding resets expiration time on lease.
3746        // the closeCallBack will be set in closeScanner so here we only care about shippedCallback
3747        if (rpcCall != null) {
3748          rpcCall.setCallBack(rsh.shippedCallback);
3749        } else {
3750          // If context is null,here we call rsh.shippedCallback directly to reuse the logic in
3751          // rsh.shippedCallback to release the internal resources in rsh,and lease is also added
3752          // back to regionserver's LeaseManager in rsh.shippedCallback.
3753          runShippedCallback(rsh);
3754        }
3755      }
3756      quota.close();
3757    }
3758  }
3759
3760  private void runShippedCallback(RegionScannerHolder rsh) throws ServiceException {
3761    assert rsh.shippedCallback != null;
3762    try {
3763      rsh.shippedCallback.run();
3764    } catch (IOException ioe) {
3765      throw new ServiceException(ioe);
3766    }
3767  }
3768
3769  private void closeScanner(HRegion region, RegionScanner scanner, String scannerName,
3770    RpcCallContext context) throws IOException {
3771    if (region.getCoprocessorHost() != null) {
3772      if (region.getCoprocessorHost().preScannerClose(scanner)) {
3773        // bypass the actual close.
3774        return;
3775      }
3776    }
3777    RegionScannerHolder rsh = scanners.remove(scannerName);
3778    if (rsh != null) {
3779      if (context != null) {
3780        context.setCallBack(rsh.closeCallBack);
3781      } else {
3782        rsh.s.close();
3783      }
3784      if (region.getCoprocessorHost() != null) {
3785        region.getCoprocessorHost().postScannerClose(scanner);
3786      }
3787      closedScanners.put(scannerName, scannerName);
3788    }
3789  }
3790
3791  @Override
3792  public CoprocessorServiceResponse execRegionServerService(RpcController controller,
3793    CoprocessorServiceRequest request) throws ServiceException {
3794    rpcPreCheck("execRegionServerService");
3795    return regionServer.execRegionServerService(controller, request);
3796  }
3797
3798  @Override
3799  public UpdateConfigurationResponse updateConfiguration(RpcController controller,
3800    UpdateConfigurationRequest request) throws ServiceException {
3801    try {
3802      requirePermission("updateConfiguration", Permission.Action.ADMIN);
3803      this.regionServer.updateConfiguration();
3804    } catch (Exception e) {
3805      throw new ServiceException(e);
3806    }
3807    return UpdateConfigurationResponse.getDefaultInstance();
3808  }
3809
3810  @Override
3811  public GetSpaceQuotaSnapshotsResponse getSpaceQuotaSnapshots(RpcController controller,
3812    GetSpaceQuotaSnapshotsRequest request) throws ServiceException {
3813    try {
3814      final RegionServerSpaceQuotaManager manager = regionServer.getRegionServerSpaceQuotaManager();
3815      final GetSpaceQuotaSnapshotsResponse.Builder builder =
3816        GetSpaceQuotaSnapshotsResponse.newBuilder();
3817      if (manager != null) {
3818        final Map<TableName, SpaceQuotaSnapshot> snapshots = manager.copyQuotaSnapshots();
3819        for (Entry<TableName, SpaceQuotaSnapshot> snapshot : snapshots.entrySet()) {
3820          builder.addSnapshots(TableQuotaSnapshot.newBuilder()
3821            .setTableName(ProtobufUtil.toProtoTableName(snapshot.getKey()))
3822            .setSnapshot(SpaceQuotaSnapshot.toProtoSnapshot(snapshot.getValue())).build());
3823        }
3824      }
3825      return builder.build();
3826    } catch (Exception e) {
3827      throw new ServiceException(e);
3828    }
3829  }
3830
3831  @Override
3832  public ClearRegionBlockCacheResponse clearRegionBlockCache(RpcController controller,
3833    ClearRegionBlockCacheRequest request) throws ServiceException {
3834    rpcPreCheck("clearRegionBlockCache");
3835    ClearRegionBlockCacheResponse.Builder builder = ClearRegionBlockCacheResponse.newBuilder();
3836    CacheEvictionStatsBuilder stats = CacheEvictionStats.builder();
3837    List<HRegion> regions = getRegions(request.getRegionList(), stats);
3838    for (HRegion region : regions) {
3839      try {
3840        stats = stats.append(this.regionServer.clearRegionBlockCache(region));
3841      } catch (Exception e) {
3842        stats.addException(region.getRegionInfo().getRegionName(), e);
3843      }
3844    }
3845    stats.withMaxCacheSize(regionServer.getBlockCache().map(BlockCache::getMaxSize).orElse(0L));
3846    return builder.setStats(ProtobufUtil.toCacheEvictionStats(stats.build())).build();
3847  }
3848
3849  private void executeOpenRegionProcedures(OpenRegionRequest request,
3850    Map<TableName, TableDescriptor> tdCache) {
3851    long masterSystemTime = request.hasMasterSystemTime() ? request.getMasterSystemTime() : -1;
3852    for (RegionOpenInfo regionOpenInfo : request.getOpenInfoList()) {
3853      RegionInfo regionInfo = ProtobufUtil.toRegionInfo(regionOpenInfo.getRegion());
3854      TableName tableName = regionInfo.getTable();
3855      TableDescriptor tableDesc = tdCache.get(tableName);
3856      if (tableDesc == null) {
3857        try {
3858          tableDesc = regionServer.getTableDescriptors().get(regionInfo.getTable());
3859        } catch (IOException e) {
3860          // Here we do not fail the whole method since we also need deal with other
3861          // procedures, and we can not ignore this one, so we still schedule a
3862          // AssignRegionHandler and it will report back to master if we still can not get the
3863          // TableDescriptor.
3864          LOG.warn("Failed to get TableDescriptor of {}, will try again in the handler",
3865            regionInfo.getTable(), e);
3866        }
3867        if (tableDesc != null) {
3868          tdCache.put(tableName, tableDesc);
3869        }
3870      }
3871      if (regionOpenInfo.getFavoredNodesCount() > 0) {
3872        regionServer.updateRegionFavoredNodesMapping(regionInfo.getEncodedName(),
3873          regionOpenInfo.getFavoredNodesList());
3874      }
3875      long procId = regionOpenInfo.getOpenProcId();
3876      if (regionServer.submitRegionProcedure(procId)) {
3877        regionServer.executorService.submit(AssignRegionHandler.create(regionServer, regionInfo,
3878          procId, tableDesc, masterSystemTime));
3879      }
3880    }
3881  }
3882
3883  private void executeCloseRegionProcedures(CloseRegionRequest request) {
3884    String encodedName;
3885    try {
3886      encodedName = ProtobufUtil.getRegionEncodedName(request.getRegion());
3887    } catch (DoNotRetryIOException e) {
3888      throw new UncheckedIOException("Should not happen", e);
3889    }
3890    ServerName destination = request.hasDestinationServer()
3891      ? ProtobufUtil.toServerName(request.getDestinationServer())
3892      : null;
3893    long procId = request.getCloseProcId();
3894    if (regionServer.submitRegionProcedure(procId)) {
3895      regionServer.executorService.submit(
3896        UnassignRegionHandler.create(regionServer, encodedName, procId, false, destination));
3897    }
3898  }
3899
3900  private void executeProcedures(RemoteProcedureRequest request) {
3901    RSProcedureCallable callable;
3902    try {
3903      callable = Class.forName(request.getProcClass()).asSubclass(RSProcedureCallable.class)
3904        .getDeclaredConstructor().newInstance();
3905    } catch (Exception e) {
3906      LOG.warn("Failed to instantiating remote procedure {}, pid={}", request.getProcClass(),
3907        request.getProcId(), e);
3908      regionServer.remoteProcedureComplete(request.getProcId(), e);
3909      return;
3910    }
3911    callable.init(request.getProcData().toByteArray(), regionServer);
3912    LOG.debug("Executing remote procedure {}, pid={}", callable.getClass(), request.getProcId());
3913    regionServer.executeProcedure(request.getProcId(), callable);
3914  }
3915
3916  @Override
3917  @QosPriority(priority = HConstants.ADMIN_QOS)
3918  public ExecuteProceduresResponse executeProcedures(RpcController controller,
3919    ExecuteProceduresRequest request) throws ServiceException {
3920    try {
3921      checkOpen();
3922      throwOnWrongStartCode(request);
3923      regionServer.getRegionServerCoprocessorHost().preExecuteProcedures();
3924      if (request.getOpenRegionCount() > 0) {
3925        // Avoid reading from the TableDescritor every time(usually it will read from the file
3926        // system)
3927        Map<TableName, TableDescriptor> tdCache = new HashMap<>();
3928        request.getOpenRegionList().forEach(req -> executeOpenRegionProcedures(req, tdCache));
3929      }
3930      if (request.getCloseRegionCount() > 0) {
3931        request.getCloseRegionList().forEach(this::executeCloseRegionProcedures);
3932      }
3933      if (request.getProcCount() > 0) {
3934        request.getProcList().forEach(this::executeProcedures);
3935      }
3936      regionServer.getRegionServerCoprocessorHost().postExecuteProcedures();
3937      return ExecuteProceduresResponse.getDefaultInstance();
3938    } catch (IOException e) {
3939      throw new ServiceException(e);
3940    }
3941  }
3942
3943  @Override
3944  @QosPriority(priority = HConstants.ADMIN_QOS)
3945  public SlowLogResponses getSlowLogResponses(final RpcController controller,
3946    final SlowLogResponseRequest request) {
3947    final NamedQueueRecorder namedQueueRecorder = this.regionServer.getNamedQueueRecorder();
3948    final List<SlowLogPayload> slowLogPayloads = getSlowLogPayloads(request, namedQueueRecorder);
3949    SlowLogResponses slowLogResponses =
3950      SlowLogResponses.newBuilder().addAllSlowLogPayloads(slowLogPayloads).build();
3951    return slowLogResponses;
3952  }
3953
3954  private List<SlowLogPayload> getSlowLogPayloads(SlowLogResponseRequest request,
3955    NamedQueueRecorder namedQueueRecorder) {
3956    if (namedQueueRecorder == null) {
3957      return Collections.emptyList();
3958    }
3959    List<SlowLogPayload> slowLogPayloads;
3960    NamedQueueGetRequest namedQueueGetRequest = new NamedQueueGetRequest();
3961    namedQueueGetRequest.setNamedQueueEvent(RpcLogDetails.SLOW_LOG_EVENT);
3962    namedQueueGetRequest.setSlowLogResponseRequest(request);
3963    NamedQueueGetResponse namedQueueGetResponse =
3964      namedQueueRecorder.getNamedQueueRecords(namedQueueGetRequest);
3965    slowLogPayloads = namedQueueGetResponse != null
3966      ? namedQueueGetResponse.getSlowLogPayloads()
3967      : Collections.emptyList();
3968    return slowLogPayloads;
3969  }
3970
3971  @Override
3972  @QosPriority(priority = HConstants.ADMIN_QOS)
3973  public SlowLogResponses getLargeLogResponses(final RpcController controller,
3974    final SlowLogResponseRequest request) {
3975    final NamedQueueRecorder namedQueueRecorder = this.regionServer.getNamedQueueRecorder();
3976    final List<SlowLogPayload> slowLogPayloads = getSlowLogPayloads(request, namedQueueRecorder);
3977    SlowLogResponses slowLogResponses =
3978      SlowLogResponses.newBuilder().addAllSlowLogPayloads(slowLogPayloads).build();
3979    return slowLogResponses;
3980  }
3981
3982  @Override
3983  @QosPriority(priority = HConstants.ADMIN_QOS)
3984  public ClearSlowLogResponses clearSlowLogsResponses(final RpcController controller,
3985    final ClearSlowLogResponseRequest request) throws ServiceException {
3986    rpcPreCheck("clearSlowLogsResponses");
3987    final NamedQueueRecorder namedQueueRecorder = this.regionServer.getNamedQueueRecorder();
3988    boolean slowLogsCleaned = Optional.ofNullable(namedQueueRecorder)
3989      .map(
3990        queueRecorder -> queueRecorder.clearNamedQueue(NamedQueuePayload.NamedQueueEvent.SLOW_LOG))
3991      .orElse(false);
3992    ClearSlowLogResponses clearSlowLogResponses =
3993      ClearSlowLogResponses.newBuilder().setIsCleaned(slowLogsCleaned).build();
3994    return clearSlowLogResponses;
3995  }
3996
3997  @Override
3998  public HBaseProtos.LogEntry getLogEntries(RpcController controller,
3999    HBaseProtos.LogRequest request) throws ServiceException {
4000    try {
4001      final String logClassName = request.getLogClassName();
4002      Class<?> logClass = Class.forName(logClassName).asSubclass(Message.class);
4003      Method method = logClass.getMethod("parseFrom", ByteString.class);
4004      if (logClassName.contains("SlowLogResponseRequest")) {
4005        SlowLogResponseRequest slowLogResponseRequest =
4006          (SlowLogResponseRequest) method.invoke(null, request.getLogMessage());
4007        final NamedQueueRecorder namedQueueRecorder = this.regionServer.getNamedQueueRecorder();
4008        final List<SlowLogPayload> slowLogPayloads =
4009          getSlowLogPayloads(slowLogResponseRequest, namedQueueRecorder);
4010        SlowLogResponses slowLogResponses =
4011          SlowLogResponses.newBuilder().addAllSlowLogPayloads(slowLogPayloads).build();
4012        return HBaseProtos.LogEntry.newBuilder()
4013          .setLogClassName(slowLogResponses.getClass().getName())
4014          .setLogMessage(slowLogResponses.toByteString()).build();
4015      }
4016    } catch (ClassNotFoundException | NoSuchMethodException | IllegalAccessException
4017      | InvocationTargetException e) {
4018      LOG.error("Error while retrieving log entries.", e);
4019      throw new ServiceException(e);
4020    }
4021    throw new ServiceException("Invalid request params");
4022  }
4023
4024  public RpcScheduler getRpcScheduler() {
4025    return rpcServer.getScheduler();
4026  }
4027
4028  protected AccessChecker getAccessChecker() {
4029    return accessChecker;
4030  }
4031
4032  protected ZKPermissionWatcher getZkPermissionWatcher() {
4033    return zkPermissionWatcher;
4034  }
4035
4036  @Override
4037  public GetClusterIdResponse getClusterId(RpcController controller, GetClusterIdRequest request)
4038    throws ServiceException {
4039    return GetClusterIdResponse.newBuilder().setClusterId(regionServer.getClusterId()).build();
4040  }
4041
4042  @Override
4043  public GetActiveMasterResponse getActiveMaster(RpcController controller,
4044    GetActiveMasterRequest request) throws ServiceException {
4045    GetActiveMasterResponse.Builder builder = GetActiveMasterResponse.newBuilder();
4046    regionServer.getActiveMaster()
4047      .ifPresent(name -> builder.setServerName(ProtobufUtil.toServerName(name)));
4048    return builder.build();
4049  }
4050
4051  @Override
4052  public GetMastersResponse getMasters(RpcController controller, GetMastersRequest request)
4053    throws ServiceException {
4054    GetMastersResponse.Builder builder = GetMastersResponse.newBuilder();
4055    regionServer.getActiveMaster()
4056      .ifPresent(activeMaster -> builder.addMasterServers(GetMastersResponseEntry.newBuilder()
4057        .setServerName(ProtobufUtil.toServerName(activeMaster)).setIsActive(true)));
4058    regionServer.getBackupMasters()
4059      .forEach(backupMaster -> builder.addMasterServers(GetMastersResponseEntry.newBuilder()
4060        .setServerName(ProtobufUtil.toServerName(backupMaster)).setIsActive(false)));
4061    return builder.build();
4062  }
4063
4064  @Override
4065  public GetMetaRegionLocationsResponse getMetaRegionLocations(RpcController controller,
4066    GetMetaRegionLocationsRequest request) throws ServiceException {
4067    GetMetaRegionLocationsResponse.Builder builder = GetMetaRegionLocationsResponse.newBuilder();
4068    Optional<List<HRegionLocation>> metaLocations =
4069      regionServer.getMetaRegionLocationCache().getMetaRegionLocations();
4070    metaLocations.ifPresent(hRegionLocations -> hRegionLocations
4071      .forEach(location -> builder.addMetaLocations(ProtobufUtil.toRegionLocation(location))));
4072    return builder.build();
4073  }
4074
4075  @Override
4076  public final GetBootstrapNodesResponse getBootstrapNodes(RpcController controller,
4077    GetBootstrapNodesRequest request) throws ServiceException {
4078    int maxNodeCount = regionServer.getConfiguration().getInt(CLIENT_BOOTSTRAP_NODE_LIMIT,
4079      DEFAULT_CLIENT_BOOTSTRAP_NODE_LIMIT);
4080    ReservoirSample<ServerName> sample = new ReservoirSample<>(maxNodeCount);
4081    sample.add(regionServer.getRegionServers());
4082
4083    GetBootstrapNodesResponse.Builder builder = GetBootstrapNodesResponse.newBuilder();
4084    sample.getSamplingResult().stream().map(ProtobufUtil::toServerName)
4085      .forEach(builder::addServerName);
4086    return builder.build();
4087  }
4088}