001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import com.google.errorprone.annotations.RestrictedApi;
021import java.io.FileNotFoundException;
022import java.io.IOException;
023import java.io.UncheckedIOException;
024import java.net.BindException;
025import java.net.InetAddress;
026import java.net.InetSocketAddress;
027import java.util.ArrayList;
028import java.util.Arrays;
029import java.util.Collections;
030import java.util.HashMap;
031import java.util.Iterator;
032import java.util.List;
033import java.util.Map;
034import java.util.Map.Entry;
035import java.util.NavigableMap;
036import java.util.Set;
037import java.util.TreeSet;
038import java.util.concurrent.ConcurrentHashMap;
039import java.util.concurrent.ConcurrentMap;
040import java.util.concurrent.TimeUnit;
041import java.util.concurrent.atomic.AtomicBoolean;
042import java.util.concurrent.atomic.AtomicLong;
043import java.util.concurrent.atomic.LongAdder;
044import org.apache.hadoop.conf.Configuration;
045import org.apache.hadoop.fs.FileSystem;
046import org.apache.hadoop.fs.Path;
047import org.apache.hadoop.hbase.CacheEvictionStats;
048import org.apache.hadoop.hbase.CacheEvictionStatsBuilder;
049import org.apache.hadoop.hbase.Cell;
050import org.apache.hadoop.hbase.CellScanner;
051import org.apache.hadoop.hbase.CellUtil;
052import org.apache.hadoop.hbase.DoNotRetryIOException;
053import org.apache.hadoop.hbase.DroppedSnapshotException;
054import org.apache.hadoop.hbase.ExtendedCellScannable;
055import org.apache.hadoop.hbase.ExtendedCellScanner;
056import org.apache.hadoop.hbase.HBaseIOException;
057import org.apache.hadoop.hbase.HBaseRpcServicesBase;
058import org.apache.hadoop.hbase.HConstants;
059import org.apache.hadoop.hbase.MultiActionResultTooLarge;
060import org.apache.hadoop.hbase.NotServingRegionException;
061import org.apache.hadoop.hbase.PrivateCellUtil;
062import org.apache.hadoop.hbase.RegionTooBusyException;
063import org.apache.hadoop.hbase.Server;
064import org.apache.hadoop.hbase.ServerName;
065import org.apache.hadoop.hbase.TableName;
066import org.apache.hadoop.hbase.UnknownScannerException;
067import org.apache.hadoop.hbase.client.Append;
068import org.apache.hadoop.hbase.client.CheckAndMutate;
069import org.apache.hadoop.hbase.client.CheckAndMutateResult;
070import org.apache.hadoop.hbase.client.ClientInternalHelper;
071import org.apache.hadoop.hbase.client.Delete;
072import org.apache.hadoop.hbase.client.Durability;
073import org.apache.hadoop.hbase.client.Get;
074import org.apache.hadoop.hbase.client.Increment;
075import org.apache.hadoop.hbase.client.Mutation;
076import org.apache.hadoop.hbase.client.OperationWithAttributes;
077import org.apache.hadoop.hbase.client.Put;
078import org.apache.hadoop.hbase.client.QueryMetrics;
079import org.apache.hadoop.hbase.client.RegionInfo;
080import org.apache.hadoop.hbase.client.RegionReplicaUtil;
081import org.apache.hadoop.hbase.client.Result;
082import org.apache.hadoop.hbase.client.Row;
083import org.apache.hadoop.hbase.client.Scan;
084import org.apache.hadoop.hbase.client.TableDescriptor;
085import org.apache.hadoop.hbase.client.VersionInfoUtil;
086import org.apache.hadoop.hbase.client.metrics.ServerSideScanMetrics;
087import org.apache.hadoop.hbase.exceptions.FailedSanityCheckException;
088import org.apache.hadoop.hbase.exceptions.OutOfOrderScannerNextException;
089import org.apache.hadoop.hbase.exceptions.ScannerResetException;
090import org.apache.hadoop.hbase.exceptions.TimeoutIOException;
091import org.apache.hadoop.hbase.exceptions.UnknownProtocolException;
092import org.apache.hadoop.hbase.io.ByteBuffAllocator;
093import org.apache.hadoop.hbase.io.hfile.BlockCache;
094import org.apache.hadoop.hbase.ipc.HBaseRpcController;
095import org.apache.hadoop.hbase.ipc.PriorityFunction;
096import org.apache.hadoop.hbase.ipc.QosPriority;
097import org.apache.hadoop.hbase.ipc.RpcCall;
098import org.apache.hadoop.hbase.ipc.RpcCallContext;
099import org.apache.hadoop.hbase.ipc.RpcCallback;
100import org.apache.hadoop.hbase.ipc.RpcServer;
101import org.apache.hadoop.hbase.ipc.RpcServer.BlockingServiceAndInterface;
102import org.apache.hadoop.hbase.ipc.RpcServerFactory;
103import org.apache.hadoop.hbase.ipc.RpcServerInterface;
104import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
105import org.apache.hadoop.hbase.ipc.ServerRpcController;
106import org.apache.hadoop.hbase.monitoring.ThreadLocalServerSideScanMetrics;
107import org.apache.hadoop.hbase.net.Address;
108import org.apache.hadoop.hbase.procedure2.RSProcedureCallable;
109import org.apache.hadoop.hbase.quotas.ActivePolicyEnforcement;
110import org.apache.hadoop.hbase.quotas.OperationQuota;
111import org.apache.hadoop.hbase.quotas.QuotaUtil;
112import org.apache.hadoop.hbase.quotas.RegionServerRpcQuotaManager;
113import org.apache.hadoop.hbase.quotas.RegionServerSpaceQuotaManager;
114import org.apache.hadoop.hbase.quotas.SpaceQuotaSnapshot;
115import org.apache.hadoop.hbase.quotas.SpaceViolationPolicyEnforcement;
116import org.apache.hadoop.hbase.regionserver.LeaseManager.Lease;
117import org.apache.hadoop.hbase.regionserver.LeaseManager.LeaseStillHeldException;
118import org.apache.hadoop.hbase.regionserver.Region.Operation;
119import org.apache.hadoop.hbase.regionserver.ScannerContext.LimitScope;
120import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
121import org.apache.hadoop.hbase.regionserver.handler.AssignRegionHandler;
122import org.apache.hadoop.hbase.regionserver.handler.OpenMetaHandler;
123import org.apache.hadoop.hbase.regionserver.handler.OpenPriorityRegionHandler;
124import org.apache.hadoop.hbase.regionserver.handler.OpenRegionHandler;
125import org.apache.hadoop.hbase.regionserver.handler.UnassignRegionHandler;
126import org.apache.hadoop.hbase.replication.ReplicationUtils;
127import org.apache.hadoop.hbase.replication.regionserver.RejectReplicationRequestStateChecker;
128import org.apache.hadoop.hbase.replication.regionserver.RejectRequestsFromClientStateChecker;
129import org.apache.hadoop.hbase.security.Superusers;
130import org.apache.hadoop.hbase.security.access.Permission;
131import org.apache.hadoop.hbase.util.Bytes;
132import org.apache.hadoop.hbase.util.DNS;
133import org.apache.hadoop.hbase.util.DNS.ServerType;
134import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
135import org.apache.hadoop.hbase.util.Pair;
136import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil;
137import org.apache.hadoop.hbase.wal.WAL;
138import org.apache.hadoop.hbase.wal.WALEdit;
139import org.apache.hadoop.hbase.wal.WALKey;
140import org.apache.hadoop.hbase.wal.WALSplitUtil;
141import org.apache.hadoop.hbase.wal.WALSplitUtil.MutationReplay;
142import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
143import org.apache.yetus.audience.InterfaceAudience;
144import org.slf4j.Logger;
145import org.slf4j.LoggerFactory;
146
147import org.apache.hbase.thirdparty.com.google.common.cache.Cache;
148import org.apache.hbase.thirdparty.com.google.common.cache.CacheBuilder;
149import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList;
150import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
151import org.apache.hbase.thirdparty.com.google.protobuf.ByteString;
152import org.apache.hbase.thirdparty.com.google.protobuf.Message;
153import org.apache.hbase.thirdparty.com.google.protobuf.RpcController;
154import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
155import org.apache.hbase.thirdparty.com.google.protobuf.TextFormat;
156import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations;
157import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils;
158
159import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
160import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
161import org.apache.hadoop.hbase.shaded.protobuf.ResponseConverter;
162import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService;
163import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearCompactionQueuesRequest;
164import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearCompactionQueuesResponse;
165import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearRegionBlockCacheRequest;
166import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearRegionBlockCacheResponse;
167import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CloseRegionRequest;
168import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CloseRegionResponse;
169import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactRegionRequest;
170import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactRegionResponse;
171import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactionSwitchRequest;
172import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactionSwitchResponse;
173import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ExecuteProceduresRequest;
174import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ExecuteProceduresResponse;
175import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionRequest;
176import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionResponse;
177import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetCachedFilesListRequest;
178import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetCachedFilesListResponse;
179import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetOnlineRegionRequest;
180import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetOnlineRegionResponse;
181import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoRequest;
182import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoResponse;
183import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionLoadRequest;
184import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionLoadResponse;
185import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetServerInfoRequest;
186import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetServerInfoResponse;
187import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetStoreFileRequest;
188import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetStoreFileResponse;
189import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionRequest;
190import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionRequest.RegionOpenInfo;
191import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionResponse;
192import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionResponse.RegionOpeningState;
193import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.RemoteProcedureRequest;
194import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ReplicateWALEntryRequest;
195import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ReplicateWALEntryResponse;
196import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.RollWALWriterRequest;
197import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.RollWALWriterResponse;
198import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.StopServerRequest;
199import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.StopServerResponse;
200import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateFavoredNodesRequest;
201import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateFavoredNodesResponse;
202import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WALEntry;
203import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WarmupRegionRequest;
204import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WarmupRegionResponse;
205import org.apache.hadoop.hbase.shaded.protobuf.generated.BootstrapNodeProtos.BootstrapNodeService;
206import org.apache.hadoop.hbase.shaded.protobuf.generated.BootstrapNodeProtos.GetAllBootstrapNodesRequest;
207import org.apache.hadoop.hbase.shaded.protobuf.generated.BootstrapNodeProtos.GetAllBootstrapNodesResponse;
208import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
209import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.Action;
210import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.BulkLoadHFileRequest;
211import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.BulkLoadHFileRequest.FamilyPath;
212import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.BulkLoadHFileResponse;
213import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CleanupBulkLoadRequest;
214import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CleanupBulkLoadResponse;
215import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService;
216import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.Condition;
217import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceRequest;
218import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceResponse;
219import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.GetRequest;
220import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.GetResponse;
221import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MultiRegionLoadStats;
222import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MultiRequest;
223import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MultiResponse;
224import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateRequest;
225import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateResponse;
226import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutationProto;
227import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutationProto.MutationType;
228import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.PrepareBulkLoadRequest;
229import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.PrepareBulkLoadResponse;
230import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.RegionAction;
231import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.RegionActionResult;
232import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ResultOrException;
233import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanRequest;
234import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanResponse;
235import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos;
236import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos.RegionLoad;
237import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.NameBytesPair;
238import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.NameInt64Pair;
239import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier;
240import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType;
241import org.apache.hadoop.hbase.shaded.protobuf.generated.MapReduceProtos.ScanMetrics;
242import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsRequest;
243import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsResponse;
244import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsResponse.TableQuotaSnapshot;
245import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.ClientMetaService;
246import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.BulkLoadDescriptor;
247import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.CompactionDescriptor;
248import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.FlushDescriptor;
249import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.RegionEventDescriptor;
250
251/**
252 * Implements the regionserver RPC services.
253 */
254@InterfaceAudience.Private
255public class RSRpcServices extends HBaseRpcServicesBase<HRegionServer>
256  implements ClientService.BlockingInterface, BootstrapNodeService.BlockingInterface {
257
258  private static final Logger LOG = LoggerFactory.getLogger(RSRpcServices.class);
259
260  /** RPC scheduler to use for the region server. */
261  public static final String REGION_SERVER_RPC_SCHEDULER_FACTORY_CLASS =
262    "hbase.region.server.rpc.scheduler.factory.class";
263
264  /**
265   * Minimum allowable time limit delta (in milliseconds) that can be enforced during scans. This
266   * configuration exists to prevent the scenario where a time limit is specified to be so
267   * restrictive that the time limit is reached immediately (before any cells are scanned).
268   */
269  private static final String REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA =
270    "hbase.region.server.rpc.minimum.scan.time.limit.delta";
271  /**
272   * Default value of {@link RSRpcServices#REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA}
273   */
274  static final long DEFAULT_REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA = 10;
275
276  /**
277   * Whether to reject rows with size > threshold defined by
278   * {@link HConstants#BATCH_ROWS_THRESHOLD_NAME}
279   */
280  private static final String REJECT_BATCH_ROWS_OVER_THRESHOLD =
281    "hbase.rpc.rows.size.threshold.reject";
282
283  /**
284   * Default value of config {@link RSRpcServices#REJECT_BATCH_ROWS_OVER_THRESHOLD}
285   */
286  private static final boolean DEFAULT_REJECT_BATCH_ROWS_OVER_THRESHOLD = false;
287
288  // Request counter. (Includes requests that are not serviced by regions.)
289  // Count only once for requests with multiple actions like multi/caching-scan/replayBatch
290  final LongAdder requestCount = new LongAdder();
291
292  // Request counter for rpc get
293  final LongAdder rpcGetRequestCount = new LongAdder();
294
295  // Request counter for rpc scan
296  final LongAdder rpcScanRequestCount = new LongAdder();
297
298  // Request counter for scans that might end up in full scans
299  final LongAdder rpcFullScanRequestCount = new LongAdder();
300
301  // Request counter for rpc multi
302  final LongAdder rpcMultiRequestCount = new LongAdder();
303
304  // Request counter for rpc mutate
305  final LongAdder rpcMutateRequestCount = new LongAdder();
306
307  private volatile long maxScannerResultSize;
308
309  private ScannerIdGenerator scannerIdGenerator;
310  private final ConcurrentMap<String, RegionScannerHolder> scanners = new ConcurrentHashMap<>();
311  // Hold the name and last sequence number of a closed scanner for a while. This is used
312  // to keep compatible for old clients which may send next or close request to a region
313  // scanner which has already been exhausted. The entries will be removed automatically
314  // after scannerLeaseTimeoutPeriod.
315  private final Cache<String, Long> closedScanners;
316  /**
317   * The lease timeout period for client scanners (milliseconds).
318   */
319  private final int scannerLeaseTimeoutPeriod;
320
321  /**
322   * The RPC timeout period (milliseconds)
323   */
324  private final int rpcTimeout;
325
326  /**
327   * The minimum allowable delta to use for the scan limit
328   */
329  private final long minimumScanTimeLimitDelta;
330
331  /**
332   * Row size threshold for multi requests above which a warning is logged
333   */
334  private volatile int rowSizeWarnThreshold;
335  /*
336   * Whether we should reject requests with very high no of rows i.e. beyond threshold defined by
337   * rowSizeWarnThreshold
338   */
339  private volatile boolean rejectRowsWithSizeOverThreshold;
340
341  final AtomicBoolean clearCompactionQueues = new AtomicBoolean(false);
342
343  /**
344   * Services launched in RSRpcServices. By default they are on but you can use the below booleans
345   * to selectively enable/disable these services (Rare is the case where you would ever turn off
346   * one or the other).
347   */
348  public static final String REGIONSERVER_ADMIN_SERVICE_CONFIG =
349    "hbase.regionserver.admin.executorService";
350  public static final String REGIONSERVER_CLIENT_SERVICE_CONFIG =
351    "hbase.regionserver.client.executorService";
352  public static final String REGIONSERVER_CLIENT_META_SERVICE_CONFIG =
353    "hbase.regionserver.client.meta.executorService";
354  public static final String REGIONSERVER_BOOTSTRAP_NODES_SERVICE_CONFIG =
355    "hbase.regionserver.bootstrap.nodes.executorService";
356
357  /**
358   * An Rpc callback for closing a RegionScanner.
359   */
360  private static final class RegionScannerCloseCallBack implements RpcCallback {
361
362    private final RegionScanner scanner;
363
364    public RegionScannerCloseCallBack(RegionScanner scanner) {
365      this.scanner = scanner;
366    }
367
368    @Override
369    public void run() throws IOException {
370      this.scanner.close();
371    }
372  }
373
374  /**
375   * An Rpc callback for doing shipped() call on a RegionScanner.
376   */
377  private class RegionScannerShippedCallBack implements RpcCallback {
378    private final String scannerName;
379    private final Shipper shipper;
380    private final Lease lease;
381
382    public RegionScannerShippedCallBack(String scannerName, Shipper shipper, Lease lease) {
383      this.scannerName = scannerName;
384      this.shipper = shipper;
385      this.lease = lease;
386    }
387
388    @Override
389    public void run() throws IOException {
390      this.shipper.shipped();
391      // We're done. On way out re-add the above removed lease. The lease was temp removed for this
392      // Rpc call and we are at end of the call now. Time to add it back.
393      if (scanners.containsKey(scannerName)) {
394        if (lease != null) {
395          server.getLeaseManager().addLease(lease);
396        }
397      }
398    }
399  }
400
401  /**
402   * An RpcCallBack that creates a list of scanners that needs to perform callBack operation on
403   * completion of multiGets.
404   */
405  static class RegionScannersCloseCallBack implements RpcCallback {
406    private final List<RegionScanner> scanners = new ArrayList<>();
407
408    public void addScanner(RegionScanner scanner) {
409      this.scanners.add(scanner);
410    }
411
412    @Override
413    public void run() {
414      for (RegionScanner scanner : scanners) {
415        try {
416          scanner.close();
417        } catch (IOException e) {
418          LOG.error("Exception while closing the scanner " + scanner, e);
419        }
420      }
421    }
422  }
423
424  static class RegionScannerContext {
425    final String scannerName;
426    final RegionScannerHolder holder;
427    final OperationQuota quota;
428
429    RegionScannerContext(String scannerName, RegionScannerHolder holder, OperationQuota quota) {
430      this.scannerName = scannerName;
431      this.holder = holder;
432      this.quota = quota;
433    }
434  }
435
436  /**
437   * Holder class which holds the RegionScanner, nextCallSeq and RpcCallbacks together.
438   */
439  static final class RegionScannerHolder {
440    private final AtomicLong nextCallSeq = new AtomicLong(0);
441    private final RegionScanner s;
442    private final HRegion r;
443    private final RpcCallback closeCallBack;
444    private final RpcCallback shippedCallback;
445    private byte[] rowOfLastPartialResult;
446    private boolean needCursor;
447    private boolean fullRegionScan;
448    private final String clientIPAndPort;
449    private final String userName;
450    private volatile long maxBlockBytesScanned = 0;
451    private volatile long prevBlockBytesScanned = 0;
452    private volatile long prevBlockBytesScannedDifference = 0;
453
454    RegionScannerHolder(RegionScanner s, HRegion r, RpcCallback closeCallBack,
455      RpcCallback shippedCallback, boolean needCursor, boolean fullRegionScan,
456      String clientIPAndPort, String userName) {
457      this.s = s;
458      this.r = r;
459      this.closeCallBack = closeCallBack;
460      this.shippedCallback = shippedCallback;
461      this.needCursor = needCursor;
462      this.fullRegionScan = fullRegionScan;
463      this.clientIPAndPort = clientIPAndPort;
464      this.userName = userName;
465    }
466
467    long getNextCallSeq() {
468      return nextCallSeq.get();
469    }
470
471    boolean incNextCallSeq(long currentSeq) {
472      // Use CAS to prevent multiple scan request running on the same scanner.
473      return nextCallSeq.compareAndSet(currentSeq, currentSeq + 1);
474    }
475
476    long getMaxBlockBytesScanned() {
477      return maxBlockBytesScanned;
478    }
479
480    long getPrevBlockBytesScannedDifference() {
481      return prevBlockBytesScannedDifference;
482    }
483
484    void updateBlockBytesScanned(long blockBytesScanned) {
485      prevBlockBytesScannedDifference = blockBytesScanned - prevBlockBytesScanned;
486      prevBlockBytesScanned = blockBytesScanned;
487      if (blockBytesScanned > maxBlockBytesScanned) {
488        maxBlockBytesScanned = blockBytesScanned;
489      }
490    }
491
492    // Should be called only when we need to print lease expired messages otherwise
493    // cache the String once made.
494    @Override
495    public String toString() {
496      return "clientIPAndPort=" + this.clientIPAndPort + ", userName=" + this.userName
497        + ", regionInfo=" + this.r.getRegionInfo().getRegionNameAsString();
498    }
499  }
500
501  /**
502   * Instantiated as a scanner lease. If the lease times out, the scanner is closed
503   */
504  private class ScannerListener implements LeaseListener {
505    private final String scannerName;
506
507    ScannerListener(final String n) {
508      this.scannerName = n;
509    }
510
511    @Override
512    public void leaseExpired() {
513      RegionScannerHolder rsh = scanners.remove(this.scannerName);
514      if (rsh == null) {
515        LOG.warn("Scanner lease {} expired but no outstanding scanner", this.scannerName);
516        return;
517      }
518      LOG.info("Scanner lease {} expired {}", this.scannerName, rsh);
519      server.getMetrics().incrScannerLeaseExpired();
520      RegionScanner s = rsh.s;
521      HRegion region = null;
522      try {
523        region = server.getRegion(s.getRegionInfo().getRegionName());
524        if (region != null && region.getCoprocessorHost() != null) {
525          region.getCoprocessorHost().preScannerClose(s);
526        }
527      } catch (IOException e) {
528        LOG.error("Closing scanner {} {}", this.scannerName, rsh, e);
529      } finally {
530        try {
531          s.close();
532          if (region != null && region.getCoprocessorHost() != null) {
533            region.getCoprocessorHost().postScannerClose(s);
534          }
535        } catch (IOException e) {
536          LOG.error("Closing scanner {} {}", this.scannerName, rsh, e);
537        }
538      }
539    }
540  }
541
542  private static ResultOrException getResultOrException(final ClientProtos.Result r,
543    final int index) {
544    return getResultOrException(ResponseConverter.buildActionResult(r), index);
545  }
546
547  private static ResultOrException getResultOrException(final Exception e, final int index) {
548    return getResultOrException(ResponseConverter.buildActionResult(e), index);
549  }
550
551  private static ResultOrException getResultOrException(final ResultOrException.Builder builder,
552    final int index) {
553    return builder.setIndex(index).build();
554  }
555
556  /**
557   * Checks for the following pre-checks in order:
558   * <ol>
559   * <li>RegionServer is running</li>
560   * <li>If authorization is enabled, then RPC caller has ADMIN permissions</li>
561   * </ol>
562   * @param requestName name of rpc request. Used in reporting failures to provide context.
563   * @throws ServiceException If any of the above listed pre-check fails.
564   */
565  private void rpcPreCheck(String requestName) throws ServiceException {
566    try {
567      checkOpen();
568      requirePermission(requestName, Permission.Action.ADMIN);
569    } catch (IOException ioe) {
570      throw new ServiceException(ioe);
571    }
572  }
573
574  private boolean isClientCellBlockSupport(RpcCallContext context) {
575    return context != null && context.isClientCellBlockSupported();
576  }
577
578  private void addResult(final MutateResponse.Builder builder, final Result result,
579    final HBaseRpcController rpcc, boolean clientCellBlockSupported) {
580    if (result == null) return;
581    if (clientCellBlockSupported) {
582      builder.setResult(ProtobufUtil.toResultNoData(result));
583      rpcc.setCellScanner(result.cellScanner());
584    } else {
585      ClientProtos.Result pbr = ProtobufUtil.toResult(result);
586      builder.setResult(pbr);
587    }
588  }
589
590  private void addResults(ScanResponse.Builder builder, List<Result> results,
591    HBaseRpcController controller, boolean isDefaultRegion, boolean clientCellBlockSupported) {
592    builder.setStale(!isDefaultRegion);
593    if (results.isEmpty()) {
594      return;
595    }
596    if (clientCellBlockSupported) {
597      for (Result res : results) {
598        builder.addCellsPerResult(res.size());
599        builder.addPartialFlagPerResult(res.mayHaveMoreCellsInRow());
600      }
601      controller.setCellScanner(PrivateCellUtil.createExtendedCellScanner(results));
602    } else {
603      for (Result res : results) {
604        ClientProtos.Result pbr = ProtobufUtil.toResult(res);
605        builder.addResults(pbr);
606      }
607    }
608  }
609
610  private CheckAndMutateResult checkAndMutate(HRegion region, List<ClientProtos.Action> actions,
611    CellScanner cellScanner, Condition condition, long nonceGroup,
612    ActivePolicyEnforcement spaceQuotaEnforcement) throws IOException {
613    int countOfCompleteMutation = 0;
614    try {
615      if (!region.getRegionInfo().isMetaRegion()) {
616        server.getMemStoreFlusher().reclaimMemStoreMemory();
617      }
618      List<Mutation> mutations = new ArrayList<>();
619      long nonce = HConstants.NO_NONCE;
620      for (ClientProtos.Action action : actions) {
621        if (action.hasGet()) {
622          throw new DoNotRetryIOException(
623            "Atomic put and/or delete only, not a Get=" + action.getGet());
624        }
625        MutationProto mutation = action.getMutation();
626        MutationType type = mutation.getMutateType();
627        switch (type) {
628          case PUT:
629            Put put = ProtobufUtil.toPut(mutation, cellScanner);
630            ++countOfCompleteMutation;
631            checkCellSizeLimit(region, put);
632            spaceQuotaEnforcement.getPolicyEnforcement(region).check(put);
633            mutations.add(put);
634            break;
635          case DELETE:
636            Delete del = ProtobufUtil.toDelete(mutation, cellScanner);
637            ++countOfCompleteMutation;
638            spaceQuotaEnforcement.getPolicyEnforcement(region).check(del);
639            mutations.add(del);
640            break;
641          case INCREMENT:
642            Increment increment = ProtobufUtil.toIncrement(mutation, cellScanner);
643            nonce = mutation.hasNonce() ? mutation.getNonce() : HConstants.NO_NONCE;
644            ++countOfCompleteMutation;
645            checkCellSizeLimit(region, increment);
646            spaceQuotaEnforcement.getPolicyEnforcement(region).check(increment);
647            mutations.add(increment);
648            break;
649          case APPEND:
650            Append append = ProtobufUtil.toAppend(mutation, cellScanner);
651            nonce = mutation.hasNonce() ? mutation.getNonce() : HConstants.NO_NONCE;
652            ++countOfCompleteMutation;
653            checkCellSizeLimit(region, append);
654            spaceQuotaEnforcement.getPolicyEnforcement(region).check(append);
655            mutations.add(append);
656            break;
657          default:
658            throw new DoNotRetryIOException("invalid mutation type : " + type);
659        }
660      }
661
662      if (mutations.size() == 0) {
663        return new CheckAndMutateResult(true, null);
664      } else {
665        CheckAndMutate checkAndMutate = ProtobufUtil.toCheckAndMutate(condition, mutations);
666        CheckAndMutateResult result = null;
667        if (region.getCoprocessorHost() != null) {
668          result = region.getCoprocessorHost().preCheckAndMutate(checkAndMutate);
669        }
670        if (result == null) {
671          result = region.checkAndMutate(checkAndMutate, nonceGroup, nonce);
672          if (region.getCoprocessorHost() != null) {
673            result = region.getCoprocessorHost().postCheckAndMutate(checkAndMutate, result);
674          }
675        }
676        return result;
677      }
678    } finally {
679      // Currently, the checkAndMutate isn't supported by batch so it won't mess up the cell scanner
680      // even if the malformed cells are not skipped.
681      for (int i = countOfCompleteMutation; i < actions.size(); ++i) {
682        skipCellsForMutation(actions.get(i), cellScanner);
683      }
684    }
685  }
686
687  /**
688   * Execute an append mutation.
689   * @return result to return to client if default operation should be bypassed as indicated by
690   *         RegionObserver, null otherwise
691   */
692  private Result append(final HRegion region, final OperationQuota quota,
693    final MutationProto mutation, final CellScanner cellScanner, long nonceGroup,
694    ActivePolicyEnforcement spaceQuota, RpcCallContext context) throws IOException {
695    long before = EnvironmentEdgeManager.currentTime();
696    Append append = ProtobufUtil.toAppend(mutation, cellScanner);
697    checkCellSizeLimit(region, append);
698    spaceQuota.getPolicyEnforcement(region).check(append);
699    quota.addMutation(append);
700    long blockBytesScannedBefore = context != null ? context.getBlockBytesScanned() : 0;
701    long nonce = mutation.hasNonce() ? mutation.getNonce() : HConstants.NO_NONCE;
702    Result r = region.append(append, nonceGroup, nonce);
703    if (server.getMetrics() != null) {
704      long blockBytesScanned =
705        context != null ? context.getBlockBytesScanned() - blockBytesScannedBefore : 0;
706      server.getMetrics().updateAppend(region, EnvironmentEdgeManager.currentTime() - before,
707        blockBytesScanned);
708    }
709    return r == null ? Result.EMPTY_RESULT : r;
710  }
711
712  /**
713   * Execute an increment mutation.
714   */
715  private Result increment(final HRegion region, final OperationQuota quota,
716    final MutationProto mutation, final CellScanner cells, long nonceGroup,
717    ActivePolicyEnforcement spaceQuota, RpcCallContext context) throws IOException {
718    long before = EnvironmentEdgeManager.currentTime();
719    Increment increment = ProtobufUtil.toIncrement(mutation, cells);
720    checkCellSizeLimit(region, increment);
721    spaceQuota.getPolicyEnforcement(region).check(increment);
722    quota.addMutation(increment);
723    long blockBytesScannedBefore = context != null ? context.getBlockBytesScanned() : 0;
724    long nonce = mutation.hasNonce() ? mutation.getNonce() : HConstants.NO_NONCE;
725    Result r = region.increment(increment, nonceGroup, nonce);
726    final MetricsRegionServer metricsRegionServer = server.getMetrics();
727    if (metricsRegionServer != null) {
728      long blockBytesScanned =
729        context != null ? context.getBlockBytesScanned() - blockBytesScannedBefore : 0;
730      metricsRegionServer.updateIncrement(region, EnvironmentEdgeManager.currentTime() - before,
731        blockBytesScanned);
732    }
733    return r == null ? Result.EMPTY_RESULT : r;
734  }
735
736  /**
737   * Run through the regionMutation <code>rm</code> and per Mutation, do the work, and then when
738   * done, add an instance of a {@link ResultOrException} that corresponds to each Mutation.
739   * @param cellsToReturn Could be null. May be allocated in this method. This is what this method
740   *                      returns as a 'result'.
741   * @param closeCallBack the callback to be used with multigets
742   * @param context       the current RpcCallContext
743   * @return Return the <code>cellScanner</code> passed
744   */
745  private List<ExtendedCellScannable> doNonAtomicRegionMutation(final HRegion region,
746    final OperationQuota quota, final RegionAction actions, final CellScanner cellScanner,
747    final RegionActionResult.Builder builder, List<ExtendedCellScannable> cellsToReturn,
748    long nonceGroup, final RegionScannersCloseCallBack closeCallBack, RpcCallContext context,
749    ActivePolicyEnforcement spaceQuotaEnforcement) {
750    // Gather up CONTIGUOUS Puts and Deletes in this mutations List. Idea is that rather than do
751    // one at a time, we instead pass them in batch. Be aware that the corresponding
752    // ResultOrException instance that matches each Put or Delete is then added down in the
753    // doNonAtomicBatchOp call. We should be staying aligned though the Put and Delete are
754    // deferred/batched
755    List<ClientProtos.Action> mutations = null;
756    long maxQuotaResultSize = Math.min(maxScannerResultSize, quota.getMaxResultSize());
757    IOException sizeIOE = null;
758    ClientProtos.ResultOrException.Builder resultOrExceptionBuilder =
759      ResultOrException.newBuilder();
760    boolean hasResultOrException = false;
761    for (ClientProtos.Action action : actions.getActionList()) {
762      hasResultOrException = false;
763      resultOrExceptionBuilder.clear();
764      try {
765        Result r = null;
766        long blockBytesScannedBefore = context != null ? context.getBlockBytesScanned() : 0;
767        if (
768          context != null && context.isRetryImmediatelySupported()
769            && (context.getResponseCellSize() > maxQuotaResultSize
770              || blockBytesScannedBefore + context.getResponseExceptionSize() > maxQuotaResultSize)
771        ) {
772
773          // We're storing the exception since the exception and reason string won't
774          // change after the response size limit is reached.
775          if (sizeIOE == null) {
776            // We don't need the stack un-winding do don't throw the exception.
777            // Throwing will kill the JVM's JIT.
778            //
779            // Instead just create the exception and then store it.
780            sizeIOE = new MultiActionResultTooLarge("Max size exceeded" + " CellSize: "
781              + context.getResponseCellSize() + " BlockSize: " + blockBytesScannedBefore);
782
783            // Only report the exception once since there's only one request that
784            // caused the exception. Otherwise this number will dominate the exceptions count.
785            rpcServer.getMetrics().exception(sizeIOE);
786          }
787
788          // Now that there's an exception is known to be created
789          // use it for the response.
790          //
791          // This will create a copy in the builder.
792          NameBytesPair pair = ResponseConverter.buildException(sizeIOE);
793          resultOrExceptionBuilder.setException(pair);
794          context.incrementResponseExceptionSize(pair.getSerializedSize());
795          resultOrExceptionBuilder.setIndex(action.getIndex());
796          builder.addResultOrException(resultOrExceptionBuilder.build());
797          skipCellsForMutation(action, cellScanner);
798          continue;
799        }
800        if (action.hasGet()) {
801          long before = EnvironmentEdgeManager.currentTime();
802          ClientProtos.Get pbGet = action.getGet();
803          // An asynchbase client, https://github.com/OpenTSDB/asynchbase, starts by trying to do
804          // a get closest before. Throwing the UnknownProtocolException signals it that it needs
805          // to switch and do hbase2 protocol (HBase servers do not tell clients what versions
806          // they are; its a problem for non-native clients like asynchbase. HBASE-20225.
807          if (pbGet.hasClosestRowBefore() && pbGet.getClosestRowBefore()) {
808            throw new UnknownProtocolException("Is this a pre-hbase-1.0.0 or asynchbase client? "
809              + "Client is invoking getClosestRowBefore removed in hbase-2.0.0 replaced by "
810              + "reverse Scan.");
811          }
812          try {
813            Get get = ProtobufUtil.toGet(pbGet);
814            if (context != null) {
815              r = get(get, (region), closeCallBack, context);
816            } else {
817              r = region.get(get);
818            }
819          } finally {
820            final MetricsRegionServer metricsRegionServer = server.getMetrics();
821            if (metricsRegionServer != null) {
822              long blockBytesScanned =
823                context != null ? context.getBlockBytesScanned() - blockBytesScannedBefore : 0;
824              metricsRegionServer.updateGet(region, EnvironmentEdgeManager.currentTime() - before,
825                blockBytesScanned);
826            }
827          }
828        } else if (action.hasServiceCall()) {
829          hasResultOrException = true;
830          Message result = execServiceOnRegion(region, action.getServiceCall());
831          ClientProtos.CoprocessorServiceResult.Builder serviceResultBuilder =
832            ClientProtos.CoprocessorServiceResult.newBuilder();
833          resultOrExceptionBuilder.setServiceResult(serviceResultBuilder
834            .setValue(serviceResultBuilder.getValueBuilder().setName(result.getClass().getName())
835              // TODO: Copy!!!
836              .setValue(UnsafeByteOperations.unsafeWrap(result.toByteArray()))));
837        } else if (action.hasMutation()) {
838          MutationType type = action.getMutation().getMutateType();
839          if (
840            type != MutationType.PUT && type != MutationType.DELETE && mutations != null
841              && !mutations.isEmpty()
842          ) {
843            // Flush out any Puts or Deletes already collected.
844            doNonAtomicBatchOp(builder, region, quota, mutations, cellScanner,
845              spaceQuotaEnforcement);
846            mutations.clear();
847          }
848          switch (type) {
849            case APPEND:
850              r = append(region, quota, action.getMutation(), cellScanner, nonceGroup,
851                spaceQuotaEnforcement, context);
852              break;
853            case INCREMENT:
854              r = increment(region, quota, action.getMutation(), cellScanner, nonceGroup,
855                spaceQuotaEnforcement, context);
856              break;
857            case PUT:
858            case DELETE:
859              // Collect the individual mutations and apply in a batch
860              if (mutations == null) {
861                mutations = new ArrayList<>(actions.getActionCount());
862              }
863              mutations.add(action);
864              break;
865            default:
866              throw new DoNotRetryIOException("Unsupported mutate type: " + type.name());
867          }
868        } else {
869          throw new HBaseIOException("Unexpected Action type");
870        }
871        if (r != null) {
872          ClientProtos.Result pbResult = null;
873          if (isClientCellBlockSupport(context)) {
874            pbResult = ProtobufUtil.toResultNoData(r);
875            // Hard to guess the size here. Just make a rough guess.
876            if (cellsToReturn == null) {
877              cellsToReturn = new ArrayList<>();
878            }
879            cellsToReturn.add(r);
880          } else {
881            pbResult = ProtobufUtil.toResult(r);
882          }
883          addSize(context, r);
884          hasResultOrException = true;
885          resultOrExceptionBuilder.setResult(pbResult);
886        }
887        // Could get to here and there was no result and no exception. Presumes we added
888        // a Put or Delete to the collecting Mutations List for adding later. In this
889        // case the corresponding ResultOrException instance for the Put or Delete will be added
890        // down in the doNonAtomicBatchOp method call rather than up here.
891      } catch (IOException ie) {
892        rpcServer.getMetrics().exception(ie);
893        hasResultOrException = true;
894        NameBytesPair pair = ResponseConverter.buildException(ie);
895        resultOrExceptionBuilder.setException(pair);
896        context.incrementResponseExceptionSize(pair.getSerializedSize());
897      }
898      if (hasResultOrException) {
899        // Propagate index.
900        resultOrExceptionBuilder.setIndex(action.getIndex());
901        builder.addResultOrException(resultOrExceptionBuilder.build());
902      }
903    }
904    // Finish up any outstanding mutations
905    if (!CollectionUtils.isEmpty(mutations)) {
906      doNonAtomicBatchOp(builder, region, quota, mutations, cellScanner, spaceQuotaEnforcement);
907    }
908    return cellsToReturn;
909  }
910
911  private void checkCellSizeLimit(final HRegion r, final Mutation m) throws IOException {
912    if (r.maxCellSize > 0) {
913      CellScanner cells = m.cellScanner();
914      while (cells.advance()) {
915        int size = PrivateCellUtil.estimatedSerializedSizeOf(cells.current());
916        if (size > r.maxCellSize) {
917          String msg = "Cell[" + cells.current() + "] with size " + size + " exceeds limit of "
918            + r.maxCellSize + " bytes";
919          LOG.debug(msg);
920          throw new DoNotRetryIOException(msg);
921        }
922      }
923    }
924  }
925
926  private void doAtomicBatchOp(final RegionActionResult.Builder builder, final HRegion region,
927    final OperationQuota quota, final List<ClientProtos.Action> mutations, final CellScanner cells,
928    long nonceGroup, ActivePolicyEnforcement spaceQuotaEnforcement) throws IOException {
929    // Just throw the exception. The exception will be caught and then added to region-level
930    // exception for RegionAction. Leaving the null to action result is ok since the null
931    // result is viewed as failure by hbase client. And the region-lever exception will be used
932    // to replaced the null result. see AsyncRequestFutureImpl#receiveMultiAction and
933    // AsyncBatchRpcRetryingCaller#onComplete for more details.
934    doBatchOp(builder, region, quota, mutations, cells, nonceGroup, spaceQuotaEnforcement, true);
935  }
936
937  private void doNonAtomicBatchOp(final RegionActionResult.Builder builder, final HRegion region,
938    final OperationQuota quota, final List<ClientProtos.Action> mutations, final CellScanner cells,
939    ActivePolicyEnforcement spaceQuotaEnforcement) {
940    try {
941      doBatchOp(builder, region, quota, mutations, cells, HConstants.NO_NONCE,
942        spaceQuotaEnforcement, false);
943    } catch (IOException e) {
944      // Set the exception for each action. The mutations in same RegionAction are group to
945      // different batch and then be processed individually. Hence, we don't set the region-level
946      // exception here for whole RegionAction.
947      for (Action mutation : mutations) {
948        builder.addResultOrException(getResultOrException(e, mutation.getIndex()));
949      }
950    }
951  }
952
953  /**
954   * Execute a list of mutations.
955   */
956  private void doBatchOp(final RegionActionResult.Builder builder, final HRegion region,
957    final OperationQuota quota, final List<ClientProtos.Action> mutations, final CellScanner cells,
958    long nonceGroup, ActivePolicyEnforcement spaceQuotaEnforcement, boolean atomic)
959    throws IOException {
960    Mutation[] mArray = new Mutation[mutations.size()];
961    long before = EnvironmentEdgeManager.currentTime();
962    boolean batchContainsPuts = false, batchContainsDelete = false;
963    try {
964      /**
965       * HBASE-17924 mutationActionMap is a map to map the relation between mutations and actions
966       * since mutation array may have been reoredered.In order to return the right result or
967       * exception to the corresponding actions, We need to know which action is the mutation belong
968       * to. We can't sort ClientProtos.Action array, since they are bonded to cellscanners.
969       */
970      Map<Mutation, ClientProtos.Action> mutationActionMap = new HashMap<>();
971      int i = 0;
972      long nonce = HConstants.NO_NONCE;
973      for (ClientProtos.Action action : mutations) {
974        if (action.hasGet()) {
975          throw new DoNotRetryIOException(
976            "Atomic put and/or delete only, not a Get=" + action.getGet());
977        }
978        MutationProto m = action.getMutation();
979        Mutation mutation;
980        switch (m.getMutateType()) {
981          case PUT:
982            mutation = ProtobufUtil.toPut(m, cells);
983            batchContainsPuts = true;
984            break;
985
986          case DELETE:
987            mutation = ProtobufUtil.toDelete(m, cells);
988            batchContainsDelete = true;
989            break;
990
991          case INCREMENT:
992            mutation = ProtobufUtil.toIncrement(m, cells);
993            nonce = m.hasNonce() ? m.getNonce() : HConstants.NO_NONCE;
994            break;
995
996          case APPEND:
997            mutation = ProtobufUtil.toAppend(m, cells);
998            nonce = m.hasNonce() ? m.getNonce() : HConstants.NO_NONCE;
999            break;
1000
1001          default:
1002            throw new DoNotRetryIOException("Invalid mutation type : " + m.getMutateType());
1003        }
1004        mutationActionMap.put(mutation, action);
1005        mArray[i++] = mutation;
1006        checkCellSizeLimit(region, mutation);
1007        // Check if a space quota disallows this mutation
1008        spaceQuotaEnforcement.getPolicyEnforcement(region).check(mutation);
1009        quota.addMutation(mutation);
1010      }
1011
1012      if (!region.getRegionInfo().isMetaRegion()) {
1013        server.getMemStoreFlusher().reclaimMemStoreMemory();
1014      }
1015
1016      // HBASE-17924
1017      // Sort to improve lock efficiency for non-atomic batch of operations. If atomic
1018      // order is preserved as its expected from the client
1019      if (!atomic) {
1020        Arrays.sort(mArray, (v1, v2) -> Row.COMPARATOR.compare(v1, v2));
1021      }
1022
1023      OperationStatus[] codes = region.batchMutate(mArray, atomic, nonceGroup, nonce);
1024
1025      // When atomic is true, it indicates that the mutateRow API or the batch API with
1026      // RowMutations is called. In this case, we need to merge the results of the
1027      // Increment/Append operations if the mutations include those operations, and set the merged
1028      // result to the first element of the ResultOrException list
1029      if (atomic) {
1030        List<ResultOrException> resultOrExceptions = new ArrayList<>();
1031        List<Result> results = new ArrayList<>();
1032        for (i = 0; i < codes.length; i++) {
1033          if (codes[i].getResult() != null) {
1034            results.add(codes[i].getResult());
1035          }
1036          if (i != 0) {
1037            resultOrExceptions
1038              .add(getResultOrException(ClientProtos.Result.getDefaultInstance(), i));
1039          }
1040        }
1041
1042        if (results.isEmpty()) {
1043          builder.addResultOrException(
1044            getResultOrException(ClientProtos.Result.getDefaultInstance(), 0));
1045        } else {
1046          // Merge the results of the Increment/Append operations
1047          List<Cell> cellList = new ArrayList<>();
1048          for (Result result : results) {
1049            if (result.rawCells() != null) {
1050              cellList.addAll(Arrays.asList(result.rawCells()));
1051            }
1052          }
1053          Result result = Result.create(cellList);
1054
1055          // Set the merged result of the Increment/Append operations to the first element of the
1056          // ResultOrException list
1057          builder.addResultOrException(getResultOrException(ProtobufUtil.toResult(result), 0));
1058        }
1059
1060        builder.addAllResultOrException(resultOrExceptions);
1061        return;
1062      }
1063
1064      for (i = 0; i < codes.length; i++) {
1065        Mutation currentMutation = mArray[i];
1066        ClientProtos.Action currentAction = mutationActionMap.get(currentMutation);
1067        int index = currentAction.hasIndex() ? currentAction.getIndex() : i;
1068        Exception e;
1069        switch (codes[i].getOperationStatusCode()) {
1070          case BAD_FAMILY:
1071            e = new NoSuchColumnFamilyException(codes[i].getExceptionMsg());
1072            builder.addResultOrException(getResultOrException(e, index));
1073            break;
1074
1075          case SANITY_CHECK_FAILURE:
1076            e = new FailedSanityCheckException(codes[i].getExceptionMsg());
1077            builder.addResultOrException(getResultOrException(e, index));
1078            break;
1079
1080          default:
1081            e = new DoNotRetryIOException(codes[i].getExceptionMsg());
1082            builder.addResultOrException(getResultOrException(e, index));
1083            break;
1084
1085          case SUCCESS:
1086            ClientProtos.Result result = codes[i].getResult() == null
1087              ? ClientProtos.Result.getDefaultInstance()
1088              : ProtobufUtil.toResult(codes[i].getResult());
1089            builder.addResultOrException(getResultOrException(result, index));
1090            break;
1091
1092          case STORE_TOO_BUSY:
1093            e = new RegionTooBusyException(codes[i].getExceptionMsg());
1094            builder.addResultOrException(getResultOrException(e, index));
1095            break;
1096        }
1097      }
1098    } finally {
1099      int processedMutationIndex = 0;
1100      for (Action mutation : mutations) {
1101        // The non-null mArray[i] means the cell scanner has been read.
1102        if (mArray[processedMutationIndex++] == null) {
1103          skipCellsForMutation(mutation, cells);
1104        }
1105      }
1106      updateMutationMetrics(region, before, batchContainsPuts, batchContainsDelete);
1107    }
1108  }
1109
1110  private void updateMutationMetrics(HRegion region, long starttime, boolean batchContainsPuts,
1111    boolean batchContainsDelete) {
1112    final MetricsRegionServer metricsRegionServer = server.getMetrics();
1113    if (metricsRegionServer != null) {
1114      long after = EnvironmentEdgeManager.currentTime();
1115      if (batchContainsPuts) {
1116        metricsRegionServer.updatePutBatch(region, after - starttime);
1117      }
1118      if (batchContainsDelete) {
1119        metricsRegionServer.updateDeleteBatch(region, after - starttime);
1120      }
1121    }
1122  }
1123
1124  /**
1125   * Execute a list of Put/Delete mutations. The function returns OperationStatus instead of
1126   * constructing MultiResponse to save a possible loop if caller doesn't need MultiResponse.
1127   * @return an array of OperationStatus which internally contains the OperationStatusCode and the
1128   *         exceptionMessage if any
1129   * @deprecated Since 3.0.0, will be removed in 4.0.0. We do not use this method for replaying
1130   *             edits for secondary replicas any more, see
1131   *             {@link #replicateToReplica(RpcController, ReplicateWALEntryRequest)}.
1132   */
1133  @Deprecated
1134  private OperationStatus[] doReplayBatchOp(final HRegion region,
1135    final List<MutationReplay> mutations, long replaySeqId) throws IOException {
1136    long before = EnvironmentEdgeManager.currentTime();
1137    boolean batchContainsPuts = false, batchContainsDelete = false;
1138    try {
1139      for (Iterator<MutationReplay> it = mutations.iterator(); it.hasNext();) {
1140        MutationReplay m = it.next();
1141
1142        if (m.getType() == MutationType.PUT) {
1143          batchContainsPuts = true;
1144        } else {
1145          batchContainsDelete = true;
1146        }
1147
1148        NavigableMap<byte[], List<Cell>> map = m.mutation.getFamilyCellMap();
1149        List<Cell> metaCells = map.get(WALEdit.METAFAMILY);
1150        if (metaCells != null && !metaCells.isEmpty()) {
1151          for (Cell metaCell : metaCells) {
1152            CompactionDescriptor compactionDesc = WALEdit.getCompaction(metaCell);
1153            boolean isDefaultReplica = RegionReplicaUtil.isDefaultReplica(region.getRegionInfo());
1154            HRegion hRegion = region;
1155            if (compactionDesc != null) {
1156              // replay the compaction. Remove the files from stores only if we are the primary
1157              // region replica (thus own the files)
1158              hRegion.replayWALCompactionMarker(compactionDesc, !isDefaultReplica, isDefaultReplica,
1159                replaySeqId);
1160              continue;
1161            }
1162            FlushDescriptor flushDesc = WALEdit.getFlushDescriptor(metaCell);
1163            if (flushDesc != null && !isDefaultReplica) {
1164              hRegion.replayWALFlushMarker(flushDesc, replaySeqId);
1165              continue;
1166            }
1167            RegionEventDescriptor regionEvent = WALEdit.getRegionEventDescriptor(metaCell);
1168            if (regionEvent != null && !isDefaultReplica) {
1169              hRegion.replayWALRegionEventMarker(regionEvent);
1170              continue;
1171            }
1172            BulkLoadDescriptor bulkLoadEvent = WALEdit.getBulkLoadDescriptor(metaCell);
1173            if (bulkLoadEvent != null) {
1174              hRegion.replayWALBulkLoadEventMarker(bulkLoadEvent);
1175              continue;
1176            }
1177          }
1178          it.remove();
1179        }
1180      }
1181      requestCount.increment();
1182      if (!region.getRegionInfo().isMetaRegion()) {
1183        server.getMemStoreFlusher().reclaimMemStoreMemory();
1184      }
1185      return region.batchReplay(mutations.toArray(new MutationReplay[mutations.size()]),
1186        replaySeqId);
1187    } finally {
1188      updateMutationMetrics(region, before, batchContainsPuts, batchContainsDelete);
1189    }
1190  }
1191
1192  private void closeAllScanners() {
1193    // Close any outstanding scanners. Means they'll get an UnknownScanner
1194    // exception next time they come in.
1195    for (Map.Entry<String, RegionScannerHolder> e : scanners.entrySet()) {
1196      try {
1197        e.getValue().s.close();
1198      } catch (IOException ioe) {
1199        LOG.warn("Closing scanner " + e.getKey(), ioe);
1200      }
1201    }
1202  }
1203
1204  // Directly invoked only for testing
1205  public RSRpcServices(final HRegionServer rs) throws IOException {
1206    super(rs, rs.getProcessName());
1207    final Configuration conf = rs.getConfiguration();
1208    setReloadableGuardrails(conf);
1209    scannerLeaseTimeoutPeriod = conf.getInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD,
1210      HConstants.DEFAULT_HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD);
1211    rpcTimeout =
1212      conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY, HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
1213    minimumScanTimeLimitDelta = conf.getLong(REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA,
1214      DEFAULT_REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA);
1215    rpcServer.setNamedQueueRecorder(rs.getNamedQueueRecorder());
1216    closedScanners = CacheBuilder.newBuilder()
1217      .expireAfterAccess(scannerLeaseTimeoutPeriod, TimeUnit.MILLISECONDS).build();
1218  }
1219
1220  @Override
1221  protected boolean defaultReservoirEnabled() {
1222    return true;
1223  }
1224
1225  @Override
1226  protected ServerType getDNSServerType() {
1227    return DNS.ServerType.REGIONSERVER;
1228  }
1229
1230  @Override
1231  protected String getHostname(Configuration conf, String defaultHostname) {
1232    return conf.get("hbase.regionserver.ipc.address", defaultHostname);
1233  }
1234
1235  @Override
1236  protected String getPortConfigName() {
1237    return HConstants.REGIONSERVER_PORT;
1238  }
1239
1240  @Override
1241  protected int getDefaultPort() {
1242    return HConstants.DEFAULT_REGIONSERVER_PORT;
1243  }
1244
1245  @Override
1246  protected Class<?> getRpcSchedulerFactoryClass(Configuration conf) {
1247    return conf.getClass(REGION_SERVER_RPC_SCHEDULER_FACTORY_CLASS,
1248      SimpleRpcSchedulerFactory.class);
1249  }
1250
1251  protected RpcServerInterface createRpcServer(final Server server,
1252    final RpcSchedulerFactory rpcSchedulerFactory, final InetSocketAddress bindAddress,
1253    final String name) throws IOException {
1254    final Configuration conf = server.getConfiguration();
1255    boolean reservoirEnabled = conf.getBoolean(ByteBuffAllocator.ALLOCATOR_POOL_ENABLED_KEY, true);
1256    try {
1257      return RpcServerFactory.createRpcServer(server, name, getServices(), bindAddress, // use final
1258                                                                                        // bindAddress
1259                                                                                        // for this
1260                                                                                        // server.
1261        conf, rpcSchedulerFactory.create(conf, this, server), reservoirEnabled);
1262    } catch (BindException be) {
1263      throw new IOException(be.getMessage() + ". To switch ports use the '"
1264        + HConstants.REGIONSERVER_PORT + "' configuration property.",
1265        be.getCause() != null ? be.getCause() : be);
1266    }
1267  }
1268
1269  protected Class<?> getRpcSchedulerFactoryClass() {
1270    final Configuration conf = server.getConfiguration();
1271    return conf.getClass(REGION_SERVER_RPC_SCHEDULER_FACTORY_CLASS,
1272      SimpleRpcSchedulerFactory.class);
1273  }
1274
1275  protected PriorityFunction createPriority() {
1276    return new RSAnnotationReadingPriorityFunction(this);
1277  }
1278
1279  public int getScannersCount() {
1280    return scanners.size();
1281  }
1282
1283  /** Returns The outstanding RegionScanner for <code>scannerId</code> or null if none found. */
1284  RegionScanner getScanner(long scannerId) {
1285    RegionScannerHolder rsh = checkQuotaAndGetRegionScannerContext(scannerId);
1286    return rsh == null ? null : rsh.s;
1287  }
1288
1289  /** Returns The associated RegionScannerHolder for <code>scannerId</code> or null. */
1290  private RegionScannerHolder checkQuotaAndGetRegionScannerContext(long scannerId) {
1291    return scanners.get(toScannerName(scannerId));
1292  }
1293
1294  public String getScanDetailsWithId(long scannerId) {
1295    RegionScanner scanner = getScanner(scannerId);
1296    if (scanner == null) {
1297      return null;
1298    }
1299    StringBuilder builder = new StringBuilder();
1300    builder.append("table: ").append(scanner.getRegionInfo().getTable().getNameAsString());
1301    builder.append(" region: ").append(scanner.getRegionInfo().getRegionNameAsString());
1302    builder.append(" operation_id: ").append(scanner.getOperationId());
1303    return builder.toString();
1304  }
1305
1306  public String getScanDetailsWithRequest(ScanRequest request) {
1307    try {
1308      if (!request.hasRegion()) {
1309        return null;
1310      }
1311      Region region = getRegion(request.getRegion());
1312      StringBuilder builder = new StringBuilder();
1313      builder.append("table: ").append(region.getRegionInfo().getTable().getNameAsString());
1314      builder.append(" region: ").append(region.getRegionInfo().getRegionNameAsString());
1315      for (NameBytesPair pair : request.getScan().getAttributeList()) {
1316        if (OperationWithAttributes.ID_ATRIBUTE.equals(pair.getName())) {
1317          builder.append(" operation_id: ").append(Bytes.toString(pair.getValue().toByteArray()));
1318          break;
1319        }
1320      }
1321      return builder.toString();
1322    } catch (IOException ignored) {
1323      return null;
1324    }
1325  }
1326
1327  /**
1328   * Get the vtime associated with the scanner. Currently the vtime is the number of "next" calls.
1329   */
1330  long getScannerVirtualTime(long scannerId) {
1331    RegionScannerHolder rsh = checkQuotaAndGetRegionScannerContext(scannerId);
1332    return rsh == null ? 0L : rsh.getNextCallSeq();
1333  }
1334
1335  /**
1336   * Method to account for the size of retained cells.
1337   * @param context rpc call context
1338   * @param r       result to add size.
1339   * @return an object that represents the last referenced block from this response.
1340   */
1341  void addSize(RpcCallContext context, Result r) {
1342    if (context != null && r != null && !r.isEmpty()) {
1343      for (Cell c : r.rawCells()) {
1344        context.incrementResponseCellSize(PrivateCellUtil.estimatedSerializedSizeOf(c));
1345      }
1346    }
1347  }
1348
1349  /** Returns Remote client's ip and port else null if can't be determined. */
1350  @RestrictedApi(explanation = "Should only be called in TestRSRpcServices and RSRpcServices",
1351      link = "", allowedOnPath = ".*(TestRSRpcServices|RSRpcServices).java")
1352  static String getRemoteClientIpAndPort() {
1353    RpcCall rpcCall = RpcServer.getCurrentCall().orElse(null);
1354    if (rpcCall == null) {
1355      return HConstants.EMPTY_STRING;
1356    }
1357    InetAddress address = rpcCall.getRemoteAddress();
1358    if (address == null) {
1359      return HConstants.EMPTY_STRING;
1360    }
1361    // Be careful here with InetAddress. Do InetAddress#getHostAddress. It will not do a name
1362    // resolution. Just use the IP. It is generally a smaller amount of info to keep around while
1363    // scanning than a hostname anyways.
1364    return Address.fromParts(address.getHostAddress(), rpcCall.getRemotePort()).toString();
1365  }
1366
1367  /** Returns Remote client's username. */
1368  @RestrictedApi(explanation = "Should only be called in TestRSRpcServices and RSRpcServices",
1369      link = "", allowedOnPath = ".*(TestRSRpcServices|RSRpcServices).java")
1370  static String getUserName() {
1371    RpcCall rpcCall = RpcServer.getCurrentCall().orElse(null);
1372    if (rpcCall == null) {
1373      return HConstants.EMPTY_STRING;
1374    }
1375    return rpcCall.getRequestUserName().orElse(HConstants.EMPTY_STRING);
1376  }
1377
1378  private RegionScannerHolder addScanner(String scannerName, RegionScanner s, Shipper shipper,
1379    HRegion r, boolean needCursor, boolean fullRegionScan) throws LeaseStillHeldException {
1380    Lease lease = server.getLeaseManager().createLease(scannerName, this.scannerLeaseTimeoutPeriod,
1381      new ScannerListener(scannerName));
1382    RpcCallback shippedCallback = new RegionScannerShippedCallBack(scannerName, shipper, lease);
1383    RpcCallback closeCallback =
1384      s instanceof RpcCallback ? (RpcCallback) s : new RegionScannerCloseCallBack(s);
1385    RegionScannerHolder rsh = new RegionScannerHolder(s, r, closeCallback, shippedCallback,
1386      needCursor, fullRegionScan, getRemoteClientIpAndPort(), getUserName());
1387    RegionScannerHolder existing = scanners.putIfAbsent(scannerName, rsh);
1388    assert existing == null : "scannerId must be unique within regionserver's whole lifecycle! "
1389      + scannerName + ", " + existing;
1390    return rsh;
1391  }
1392
1393  private boolean isFullRegionScan(Scan scan, HRegion region) {
1394    // If the scan start row equals or less than the start key of the region
1395    // and stop row greater than equals end key (if stop row present)
1396    // or if the stop row is empty
1397    // account this as a full region scan
1398    if (
1399      Bytes.compareTo(scan.getStartRow(), region.getRegionInfo().getStartKey()) <= 0
1400        && (Bytes.compareTo(scan.getStopRow(), region.getRegionInfo().getEndKey()) >= 0
1401          && !Bytes.equals(region.getRegionInfo().getEndKey(), HConstants.EMPTY_END_ROW)
1402          || Bytes.equals(scan.getStopRow(), HConstants.EMPTY_END_ROW))
1403    ) {
1404      return true;
1405    }
1406    return false;
1407  }
1408
1409  /**
1410   * Find the HRegion based on a region specifier
1411   * @param regionSpecifier the region specifier
1412   * @return the corresponding region
1413   * @throws IOException if the specifier is not null, but failed to find the region
1414   */
1415  public HRegion getRegion(final RegionSpecifier regionSpecifier) throws IOException {
1416    return server.getRegion(regionSpecifier.getValue().toByteArray());
1417  }
1418
1419  /**
1420   * Find the List of HRegions based on a list of region specifiers
1421   * @param regionSpecifiers the list of region specifiers
1422   * @return the corresponding list of regions
1423   * @throws IOException if any of the specifiers is not null, but failed to find the region
1424   */
1425  private List<HRegion> getRegions(final List<RegionSpecifier> regionSpecifiers,
1426    final CacheEvictionStatsBuilder stats) {
1427    List<HRegion> regions = Lists.newArrayListWithCapacity(regionSpecifiers.size());
1428    for (RegionSpecifier regionSpecifier : regionSpecifiers) {
1429      try {
1430        regions.add(server.getRegion(regionSpecifier.getValue().toByteArray()));
1431      } catch (NotServingRegionException e) {
1432        stats.addException(regionSpecifier.getValue().toByteArray(), e);
1433      }
1434    }
1435    return regions;
1436  }
1437
1438  PriorityFunction getPriority() {
1439    return priority;
1440  }
1441
1442  private RegionServerRpcQuotaManager getRpcQuotaManager() {
1443    return server.getRegionServerRpcQuotaManager();
1444  }
1445
1446  private RegionServerSpaceQuotaManager getSpaceQuotaManager() {
1447    return server.getRegionServerSpaceQuotaManager();
1448  }
1449
1450  void start(ZKWatcher zkWatcher) {
1451    this.scannerIdGenerator = new ScannerIdGenerator(this.server.getServerName());
1452    internalStart(zkWatcher);
1453  }
1454
1455  void stop() {
1456    closeAllScanners();
1457    internalStop();
1458  }
1459
1460  /**
1461   * Called to verify that this server is up and running.
1462   */
1463  // TODO : Rename this and HMaster#checkInitialized to isRunning() (or a better name).
1464  protected void checkOpen() throws IOException {
1465    if (server.isAborted()) {
1466      throw new RegionServerAbortedException("Server " + server.getServerName() + " aborting");
1467    }
1468    if (server.isStopped()) {
1469      throw new RegionServerStoppedException("Server " + server.getServerName() + " stopping");
1470    }
1471    if (!server.isDataFileSystemOk()) {
1472      throw new RegionServerStoppedException("File system not available");
1473    }
1474    if (!server.isOnline()) {
1475      throw new ServerNotRunningYetException(
1476        "Server " + server.getServerName() + " is not running yet");
1477    }
1478  }
1479
1480  /**
1481   * By default, put up an Admin and a Client Service. Set booleans
1482   * <code>hbase.regionserver.admin.executorService</code> and
1483   * <code>hbase.regionserver.client.executorService</code> if you want to enable/disable services.
1484   * Default is that both are enabled.
1485   * @return immutable list of blocking services and the security info classes that this server
1486   *         supports
1487   */
1488  protected List<BlockingServiceAndInterface> getServices() {
1489    boolean admin = getConfiguration().getBoolean(REGIONSERVER_ADMIN_SERVICE_CONFIG, true);
1490    boolean client = getConfiguration().getBoolean(REGIONSERVER_CLIENT_SERVICE_CONFIG, true);
1491    boolean clientMeta =
1492      getConfiguration().getBoolean(REGIONSERVER_CLIENT_META_SERVICE_CONFIG, true);
1493    boolean bootstrapNodes =
1494      getConfiguration().getBoolean(REGIONSERVER_BOOTSTRAP_NODES_SERVICE_CONFIG, true);
1495    List<BlockingServiceAndInterface> bssi = new ArrayList<>();
1496    if (client) {
1497      bssi.add(new BlockingServiceAndInterface(ClientService.newReflectiveBlockingService(this),
1498        ClientService.BlockingInterface.class));
1499    }
1500    if (admin) {
1501      bssi.add(new BlockingServiceAndInterface(AdminService.newReflectiveBlockingService(this),
1502        AdminService.BlockingInterface.class));
1503    }
1504    if (clientMeta) {
1505      bssi.add(new BlockingServiceAndInterface(ClientMetaService.newReflectiveBlockingService(this),
1506        ClientMetaService.BlockingInterface.class));
1507    }
1508    if (bootstrapNodes) {
1509      bssi.add(
1510        new BlockingServiceAndInterface(BootstrapNodeService.newReflectiveBlockingService(this),
1511          BootstrapNodeService.BlockingInterface.class));
1512    }
1513    return new ImmutableList.Builder<BlockingServiceAndInterface>().addAll(bssi).build();
1514  }
1515
1516  /**
1517   * Close a region on the region server.
1518   * @param controller the RPC controller
1519   * @param request    the request
1520   */
1521  @Override
1522  @QosPriority(priority = HConstants.ADMIN_QOS)
1523  public CloseRegionResponse closeRegion(final RpcController controller,
1524    final CloseRegionRequest request) throws ServiceException {
1525    final ServerName sn = (request.hasDestinationServer()
1526      ? ProtobufUtil.toServerName(request.getDestinationServer())
1527      : null);
1528
1529    try {
1530      checkOpen();
1531      throwOnWrongStartCode(request);
1532      final String encodedRegionName = ProtobufUtil.getRegionEncodedName(request.getRegion());
1533
1534      requestCount.increment();
1535      if (sn == null) {
1536        LOG.info("Close " + encodedRegionName + " without moving");
1537      } else {
1538        LOG.info("Close " + encodedRegionName + ", moving to " + sn);
1539      }
1540      boolean closed = server.closeRegion(encodedRegionName, false, sn);
1541      CloseRegionResponse.Builder builder = CloseRegionResponse.newBuilder().setClosed(closed);
1542      return builder.build();
1543    } catch (IOException ie) {
1544      throw new ServiceException(ie);
1545    }
1546  }
1547
1548  /**
1549   * Compact a region on the region server.
1550   * @param controller the RPC controller
1551   * @param request    the request
1552   */
1553  @Override
1554  @QosPriority(priority = HConstants.ADMIN_QOS)
1555  public CompactRegionResponse compactRegion(final RpcController controller,
1556    final CompactRegionRequest request) throws ServiceException {
1557    try {
1558      checkOpen();
1559      requestCount.increment();
1560      HRegion region = getRegion(request.getRegion());
1561      // Quota support is enabled, the requesting user is not system/super user
1562      // and a quota policy is enforced that disables compactions.
1563      if (
1564        QuotaUtil.isQuotaEnabled(getConfiguration())
1565          && !Superusers.isSuperUser(RpcServer.getRequestUser().orElse(null))
1566          && this.server.getRegionServerSpaceQuotaManager()
1567            .areCompactionsDisabled(region.getTableDescriptor().getTableName())
1568      ) {
1569        throw new DoNotRetryIOException(
1570          "Compactions on this region are " + "disabled due to a space quota violation.");
1571      }
1572      region.startRegionOperation(Operation.COMPACT_REGION);
1573      LOG.info("Compacting " + region.getRegionInfo().getRegionNameAsString());
1574      boolean major = request.hasMajor() && request.getMajor();
1575      if (request.hasFamily()) {
1576        byte[] family = request.getFamily().toByteArray();
1577        String log = "User-triggered " + (major ? "major " : "") + "compaction for region "
1578          + region.getRegionInfo().getRegionNameAsString() + " and family "
1579          + Bytes.toString(family);
1580        LOG.trace(log);
1581        region.requestCompaction(family, log, Store.PRIORITY_USER, major,
1582          CompactionLifeCycleTracker.DUMMY);
1583      } else {
1584        String log = "User-triggered " + (major ? "major " : "") + "compaction for region "
1585          + region.getRegionInfo().getRegionNameAsString();
1586        LOG.trace(log);
1587        region.requestCompaction(log, Store.PRIORITY_USER, major, CompactionLifeCycleTracker.DUMMY);
1588      }
1589      return CompactRegionResponse.newBuilder().build();
1590    } catch (IOException ie) {
1591      throw new ServiceException(ie);
1592    }
1593  }
1594
1595  @Override
1596  @QosPriority(priority = HConstants.ADMIN_QOS)
1597  public CompactionSwitchResponse compactionSwitch(RpcController controller,
1598    CompactionSwitchRequest request) throws ServiceException {
1599    rpcPreCheck("compactionSwitch");
1600    final CompactSplit compactSplitThread = server.getCompactSplitThread();
1601    requestCount.increment();
1602    boolean prevState = compactSplitThread.isCompactionsEnabled();
1603    CompactionSwitchResponse response =
1604      CompactionSwitchResponse.newBuilder().setPrevState(prevState).build();
1605    if (prevState == request.getEnabled()) {
1606      // passed in requested state is same as current state. No action required
1607      return response;
1608    }
1609    compactSplitThread.switchCompaction(request.getEnabled());
1610    return response;
1611  }
1612
1613  /**
1614   * Flush a region on the region server.
1615   * @param controller the RPC controller
1616   * @param request    the request
1617   */
1618  @Override
1619  @QosPriority(priority = HConstants.ADMIN_QOS)
1620  public FlushRegionResponse flushRegion(final RpcController controller,
1621    final FlushRegionRequest request) throws ServiceException {
1622    try {
1623      checkOpen();
1624      requestCount.increment();
1625      HRegion region = getRegion(request.getRegion());
1626      LOG.info("Flushing " + region.getRegionInfo().getRegionNameAsString());
1627      boolean shouldFlush = true;
1628      if (request.hasIfOlderThanTs()) {
1629        shouldFlush = region.getEarliestFlushTimeForAllStores() < request.getIfOlderThanTs();
1630      }
1631      FlushRegionResponse.Builder builder = FlushRegionResponse.newBuilder();
1632      if (shouldFlush) {
1633        boolean writeFlushWalMarker =
1634          request.hasWriteFlushWalMarker() ? request.getWriteFlushWalMarker() : false;
1635        // Go behind the curtain so we can manage writing of the flush WAL marker
1636        HRegion.FlushResultImpl flushResult = null;
1637        if (request.hasFamily()) {
1638          List<byte[]> families = new ArrayList();
1639          families.add(request.getFamily().toByteArray());
1640          TableDescriptor tableDescriptor = region.getTableDescriptor();
1641          List<String> noSuchFamilies = families.stream()
1642            .filter(f -> !tableDescriptor.hasColumnFamily(f)).map(Bytes::toString).toList();
1643          if (!noSuchFamilies.isEmpty()) {
1644            throw new NoSuchColumnFamilyException("Column families " + noSuchFamilies
1645              + " don't exist in table " + tableDescriptor.getTableName().getNameAsString());
1646          }
1647          flushResult =
1648            region.flushcache(families, writeFlushWalMarker, FlushLifeCycleTracker.DUMMY);
1649        } else {
1650          flushResult = region.flushcache(true, writeFlushWalMarker, FlushLifeCycleTracker.DUMMY);
1651        }
1652        boolean compactionNeeded = flushResult.isCompactionNeeded();
1653        if (compactionNeeded) {
1654          server.getCompactSplitThread().requestSystemCompaction(region,
1655            "Compaction through user triggered flush");
1656        }
1657        builder.setFlushed(flushResult.isFlushSucceeded());
1658        builder.setWroteFlushWalMarker(flushResult.wroteFlushWalMarker);
1659      }
1660      builder.setLastFlushTime(region.getEarliestFlushTimeForAllStores());
1661      return builder.build();
1662    } catch (DroppedSnapshotException ex) {
1663      // Cache flush can fail in a few places. If it fails in a critical
1664      // section, we get a DroppedSnapshotException and a replay of wal
1665      // is required. Currently the only way to do this is a restart of
1666      // the server.
1667      server.abort("Replay of WAL required. Forcing server shutdown", ex);
1668      throw new ServiceException(ex);
1669    } catch (IOException ie) {
1670      throw new ServiceException(ie);
1671    }
1672  }
1673
1674  @Override
1675  @QosPriority(priority = HConstants.ADMIN_QOS)
1676  public GetOnlineRegionResponse getOnlineRegion(final RpcController controller,
1677    final GetOnlineRegionRequest request) throws ServiceException {
1678    try {
1679      checkOpen();
1680      requestCount.increment();
1681      Map<String, HRegion> onlineRegions = server.getOnlineRegions();
1682      List<RegionInfo> list = new ArrayList<>(onlineRegions.size());
1683      for (HRegion region : onlineRegions.values()) {
1684        list.add(region.getRegionInfo());
1685      }
1686      list.sort(RegionInfo.COMPARATOR);
1687      return ResponseConverter.buildGetOnlineRegionResponse(list);
1688    } catch (IOException ie) {
1689      throw new ServiceException(ie);
1690    }
1691  }
1692
1693  // Master implementation of this Admin Service differs given it is not
1694  // able to supply detail only known to RegionServer. See note on
1695  // MasterRpcServers#getRegionInfo.
1696  @Override
1697  @QosPriority(priority = HConstants.ADMIN_QOS)
1698  public GetRegionInfoResponse getRegionInfo(final RpcController controller,
1699    final GetRegionInfoRequest request) throws ServiceException {
1700    try {
1701      checkOpen();
1702      requestCount.increment();
1703      HRegion region = getRegion(request.getRegion());
1704      RegionInfo info = region.getRegionInfo();
1705      byte[] bestSplitRow;
1706      if (request.hasBestSplitRow() && request.getBestSplitRow()) {
1707        bestSplitRow = region.checkSplit(true).orElse(null);
1708        // when all table data are in memstore, bestSplitRow = null
1709        // try to flush region first
1710        if (bestSplitRow == null) {
1711          region.flush(true);
1712          bestSplitRow = region.checkSplit(true).orElse(null);
1713        }
1714      } else {
1715        bestSplitRow = null;
1716      }
1717      GetRegionInfoResponse.Builder builder = GetRegionInfoResponse.newBuilder();
1718      builder.setRegionInfo(ProtobufUtil.toRegionInfo(info));
1719      if (request.hasCompactionState() && request.getCompactionState()) {
1720        builder.setCompactionState(ProtobufUtil.createCompactionState(region.getCompactionState()));
1721      }
1722      builder.setSplittable(region.isSplittable());
1723      builder.setMergeable(region.isMergeable());
1724      if (request.hasBestSplitRow() && request.getBestSplitRow() && bestSplitRow != null) {
1725        builder.setBestSplitRow(UnsafeByteOperations.unsafeWrap(bestSplitRow));
1726      }
1727      return builder.build();
1728    } catch (IOException ie) {
1729      throw new ServiceException(ie);
1730    }
1731  }
1732
1733  @Override
1734  @QosPriority(priority = HConstants.ADMIN_QOS)
1735  public GetRegionLoadResponse getRegionLoad(RpcController controller, GetRegionLoadRequest request)
1736    throws ServiceException {
1737
1738    List<HRegion> regions;
1739    if (request.hasTableName()) {
1740      TableName tableName = ProtobufUtil.toTableName(request.getTableName());
1741      regions = server.getRegions(tableName);
1742    } else {
1743      regions = server.getRegions();
1744    }
1745    List<RegionLoad> rLoads = new ArrayList<>(regions.size());
1746    RegionLoad.Builder regionLoadBuilder = ClusterStatusProtos.RegionLoad.newBuilder();
1747    RegionSpecifier.Builder regionSpecifier = RegionSpecifier.newBuilder();
1748
1749    try {
1750      for (HRegion region : regions) {
1751        rLoads.add(server.createRegionLoad(region, regionLoadBuilder, regionSpecifier));
1752      }
1753    } catch (IOException e) {
1754      throw new ServiceException(e);
1755    }
1756    GetRegionLoadResponse.Builder builder = GetRegionLoadResponse.newBuilder();
1757    builder.addAllRegionLoads(rLoads);
1758    return builder.build();
1759  }
1760
1761  @Override
1762  @QosPriority(priority = HConstants.ADMIN_QOS)
1763  public ClearCompactionQueuesResponse clearCompactionQueues(RpcController controller,
1764    ClearCompactionQueuesRequest request) throws ServiceException {
1765    LOG.debug("Client=" + RpcServer.getRequestUserName().orElse(null) + "/"
1766      + RpcServer.getRemoteAddress().orElse(null) + " clear compactions queue");
1767    ClearCompactionQueuesResponse.Builder respBuilder = ClearCompactionQueuesResponse.newBuilder();
1768    requestCount.increment();
1769    if (clearCompactionQueues.compareAndSet(false, true)) {
1770      final CompactSplit compactSplitThread = server.getCompactSplitThread();
1771      try {
1772        checkOpen();
1773        server.getRegionServerCoprocessorHost().preClearCompactionQueues();
1774        for (String queueName : request.getQueueNameList()) {
1775          LOG.debug("clear " + queueName + " compaction queue");
1776          switch (queueName) {
1777            case "long":
1778              compactSplitThread.clearLongCompactionsQueue();
1779              break;
1780            case "short":
1781              compactSplitThread.clearShortCompactionsQueue();
1782              break;
1783            default:
1784              LOG.warn("Unknown queue name " + queueName);
1785              throw new IOException("Unknown queue name " + queueName);
1786          }
1787        }
1788        server.getRegionServerCoprocessorHost().postClearCompactionQueues();
1789      } catch (IOException ie) {
1790        throw new ServiceException(ie);
1791      } finally {
1792        clearCompactionQueues.set(false);
1793      }
1794    } else {
1795      LOG.warn("Clear compactions queue is executing by other admin.");
1796    }
1797    return respBuilder.build();
1798  }
1799
1800  /**
1801   * Get some information of the region server.
1802   * @param controller the RPC controller
1803   * @param request    the request
1804   */
1805  @Override
1806  @QosPriority(priority = HConstants.ADMIN_QOS)
1807  public GetServerInfoResponse getServerInfo(final RpcController controller,
1808    final GetServerInfoRequest request) throws ServiceException {
1809    try {
1810      checkOpen();
1811    } catch (IOException ie) {
1812      throw new ServiceException(ie);
1813    }
1814    requestCount.increment();
1815    int infoPort = server.getInfoServer() != null ? server.getInfoServer().getPort() : -1;
1816    return ResponseConverter.buildGetServerInfoResponse(server.getServerName(), infoPort);
1817  }
1818
1819  @Override
1820  @QosPriority(priority = HConstants.ADMIN_QOS)
1821  public GetStoreFileResponse getStoreFile(final RpcController controller,
1822    final GetStoreFileRequest request) throws ServiceException {
1823    try {
1824      checkOpen();
1825      HRegion region = getRegion(request.getRegion());
1826      requestCount.increment();
1827      Set<byte[]> columnFamilies;
1828      if (request.getFamilyCount() == 0) {
1829        columnFamilies = region.getTableDescriptor().getColumnFamilyNames();
1830      } else {
1831        columnFamilies = new TreeSet<>(Bytes.BYTES_RAWCOMPARATOR);
1832        for (ByteString cf : request.getFamilyList()) {
1833          columnFamilies.add(cf.toByteArray());
1834        }
1835      }
1836      int nCF = columnFamilies.size();
1837      List<String> fileList = region.getStoreFileList(columnFamilies.toArray(new byte[nCF][]));
1838      GetStoreFileResponse.Builder builder = GetStoreFileResponse.newBuilder();
1839      builder.addAllStoreFile(fileList);
1840      return builder.build();
1841    } catch (IOException ie) {
1842      throw new ServiceException(ie);
1843    }
1844  }
1845
1846  private void throwOnWrongStartCode(OpenRegionRequest request) throws ServiceException {
1847    if (!request.hasServerStartCode()) {
1848      LOG.warn("OpenRegionRequest for {} does not have a start code", request.getOpenInfoList());
1849      return;
1850    }
1851    throwOnWrongStartCode(request.getServerStartCode());
1852  }
1853
1854  private void throwOnWrongStartCode(CloseRegionRequest request) throws ServiceException {
1855    if (!request.hasServerStartCode()) {
1856      LOG.warn("CloseRegionRequest for {} does not have a start code", request.getRegion());
1857      return;
1858    }
1859    throwOnWrongStartCode(request.getServerStartCode());
1860  }
1861
1862  private void throwOnWrongStartCode(long serverStartCode) throws ServiceException {
1863    // check that we are the same server that this RPC is intended for.
1864    if (server.getServerName().getStartcode() != serverStartCode) {
1865      throw new ServiceException(new DoNotRetryIOException(
1866        "This RPC was intended for a " + "different server with startCode: " + serverStartCode
1867          + ", this server is: " + server.getServerName()));
1868    }
1869  }
1870
1871  private void throwOnWrongStartCode(ExecuteProceduresRequest req) throws ServiceException {
1872    if (req.getOpenRegionCount() > 0) {
1873      for (OpenRegionRequest openReq : req.getOpenRegionList()) {
1874        throwOnWrongStartCode(openReq);
1875      }
1876    }
1877    if (req.getCloseRegionCount() > 0) {
1878      for (CloseRegionRequest closeReq : req.getCloseRegionList()) {
1879        throwOnWrongStartCode(closeReq);
1880      }
1881    }
1882  }
1883
1884  /**
1885   * Open asynchronously a region or a set of regions on the region server. The opening is
1886   * coordinated by ZooKeeper, and this method requires the znode to be created before being called.
1887   * As a consequence, this method should be called only from the master.
1888   * <p>
1889   * Different manages states for the region are:
1890   * </p>
1891   * <ul>
1892   * <li>region not opened: the region opening will start asynchronously.</li>
1893   * <li>a close is already in progress: this is considered as an error.</li>
1894   * <li>an open is already in progress: this new open request will be ignored. This is important
1895   * because the Master can do multiple requests if it crashes.</li>
1896   * <li>the region is already opened: this new open request will be ignored.</li>
1897   * </ul>
1898   * <p>
1899   * Bulk assign: If there are more than 1 region to open, it will be considered as a bulk assign.
1900   * For a single region opening, errors are sent through a ServiceException. For bulk assign,
1901   * errors are put in the response as FAILED_OPENING.
1902   * </p>
1903   * @param controller the RPC controller
1904   * @param request    the request
1905   */
1906  @Override
1907  @QosPriority(priority = HConstants.ADMIN_QOS)
1908  public OpenRegionResponse openRegion(final RpcController controller,
1909    final OpenRegionRequest request) throws ServiceException {
1910    requestCount.increment();
1911    throwOnWrongStartCode(request);
1912
1913    OpenRegionResponse.Builder builder = OpenRegionResponse.newBuilder();
1914    final int regionCount = request.getOpenInfoCount();
1915    final Map<TableName, TableDescriptor> htds = new HashMap<>(regionCount);
1916    final boolean isBulkAssign = regionCount > 1;
1917    try {
1918      checkOpen();
1919    } catch (IOException ie) {
1920      TableName tableName = null;
1921      if (regionCount == 1) {
1922        org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionInfo ri =
1923          request.getOpenInfo(0).getRegion();
1924        if (ri != null) {
1925          tableName = ProtobufUtil.toTableName(ri.getTableName());
1926        }
1927      }
1928      if (!TableName.META_TABLE_NAME.equals(tableName)) {
1929        throw new ServiceException(ie);
1930      }
1931      // We are assigning meta, wait a little for regionserver to finish initialization.
1932      // Default to quarter of RPC timeout
1933      int timeout = server.getConfiguration().getInt(HConstants.HBASE_RPC_TIMEOUT_KEY,
1934        HConstants.DEFAULT_HBASE_RPC_TIMEOUT) >> 2;
1935      long endTime = EnvironmentEdgeManager.currentTime() + timeout;
1936      synchronized (server.online) {
1937        try {
1938          while (
1939            EnvironmentEdgeManager.currentTime() <= endTime && !server.isStopped()
1940              && !server.isOnline()
1941          ) {
1942            server.online.wait(server.getMsgInterval());
1943          }
1944          checkOpen();
1945        } catch (InterruptedException t) {
1946          Thread.currentThread().interrupt();
1947          throw new ServiceException(t);
1948        } catch (IOException e) {
1949          throw new ServiceException(e);
1950        }
1951      }
1952    }
1953
1954    long masterSystemTime = request.hasMasterSystemTime() ? request.getMasterSystemTime() : -1;
1955
1956    for (RegionOpenInfo regionOpenInfo : request.getOpenInfoList()) {
1957      final RegionInfo region = ProtobufUtil.toRegionInfo(regionOpenInfo.getRegion());
1958      TableDescriptor htd;
1959      try {
1960        String encodedName = region.getEncodedName();
1961        byte[] encodedNameBytes = region.getEncodedNameAsBytes();
1962        final HRegion onlineRegion = server.getRegion(encodedName);
1963        if (onlineRegion != null) {
1964          // The region is already online. This should not happen any more.
1965          String error = "Received OPEN for the region:" + region.getRegionNameAsString()
1966            + ", which is already online";
1967          LOG.warn(error);
1968          // server.abort(error);
1969          // throw new IOException(error);
1970          builder.addOpeningState(RegionOpeningState.OPENED);
1971          continue;
1972        }
1973        LOG.info("Open " + region.getRegionNameAsString());
1974
1975        final Boolean previous =
1976          server.getRegionsInTransitionInRS().putIfAbsent(encodedNameBytes, Boolean.TRUE);
1977
1978        if (Boolean.FALSE.equals(previous)) {
1979          if (server.getRegion(encodedName) != null) {
1980            // There is a close in progress. This should not happen any more.
1981            String error = "Received OPEN for the region:" + region.getRegionNameAsString()
1982              + ", which we are already trying to CLOSE";
1983            server.abort(error);
1984            throw new IOException(error);
1985          }
1986          server.getRegionsInTransitionInRS().put(encodedNameBytes, Boolean.TRUE);
1987        }
1988
1989        if (Boolean.TRUE.equals(previous)) {
1990          // An open is in progress. This is supported, but let's log this.
1991          LOG.info("Receiving OPEN for the region:" + region.getRegionNameAsString()
1992            + ", which we are already trying to OPEN"
1993            + " - ignoring this new request for this region.");
1994        }
1995
1996        // We are opening this region. If it moves back and forth for whatever reason, we don't
1997        // want to keep returning the stale moved record while we are opening/if we close again.
1998        server.removeFromMovedRegions(region.getEncodedName());
1999
2000        if (previous == null || !previous.booleanValue()) {
2001          htd = htds.get(region.getTable());
2002          if (htd == null) {
2003            htd = server.getTableDescriptors().get(region.getTable());
2004            htds.put(region.getTable(), htd);
2005          }
2006          if (htd == null) {
2007            throw new IOException("Missing table descriptor for " + region.getEncodedName());
2008          }
2009          // If there is no action in progress, we can submit a specific handler.
2010          // Need to pass the expected version in the constructor.
2011          if (server.getExecutorService() == null) {
2012            LOG.info("No executor executorService; skipping open request");
2013          } else {
2014            if (region.isMetaRegion()) {
2015              server.getExecutorService()
2016                .submit(new OpenMetaHandler(server, server, region, htd, masterSystemTime));
2017            } else {
2018              if (regionOpenInfo.getFavoredNodesCount() > 0) {
2019                server.updateRegionFavoredNodesMapping(region.getEncodedName(),
2020                  regionOpenInfo.getFavoredNodesList());
2021              }
2022              if (htd.getPriority() >= HConstants.ADMIN_QOS || region.getTable().isSystemTable()) {
2023                server.getExecutorService().submit(
2024                  new OpenPriorityRegionHandler(server, server, region, htd, masterSystemTime));
2025              } else {
2026                server.getExecutorService()
2027                  .submit(new OpenRegionHandler(server, server, region, htd, masterSystemTime));
2028              }
2029            }
2030          }
2031        }
2032
2033        builder.addOpeningState(RegionOpeningState.OPENED);
2034      } catch (IOException ie) {
2035        LOG.warn("Failed opening region " + region.getRegionNameAsString(), ie);
2036        if (isBulkAssign) {
2037          builder.addOpeningState(RegionOpeningState.FAILED_OPENING);
2038        } else {
2039          throw new ServiceException(ie);
2040        }
2041      }
2042    }
2043    return builder.build();
2044  }
2045
2046  /**
2047   * Warmup a region on this server. This method should only be called by Master. It synchronously
2048   * opens the region and closes the region bringing the most important pages in cache.
2049   */
2050  @Override
2051  public WarmupRegionResponse warmupRegion(final RpcController controller,
2052    final WarmupRegionRequest request) throws ServiceException {
2053    final RegionInfo region = ProtobufUtil.toRegionInfo(request.getRegionInfo());
2054    WarmupRegionResponse response = WarmupRegionResponse.getDefaultInstance();
2055    try {
2056      checkOpen();
2057      String encodedName = region.getEncodedName();
2058      byte[] encodedNameBytes = region.getEncodedNameAsBytes();
2059      final HRegion onlineRegion = server.getRegion(encodedName);
2060      if (onlineRegion != null) {
2061        LOG.info("{} is online; skipping warmup", region);
2062        return response;
2063      }
2064      TableDescriptor htd = server.getTableDescriptors().get(region.getTable());
2065      if (server.getRegionsInTransitionInRS().containsKey(encodedNameBytes)) {
2066        LOG.info("{} is in transition; skipping warmup", region);
2067        return response;
2068      }
2069      LOG.info("Warmup {}", region.getRegionNameAsString());
2070      HRegion.warmupHRegion(region, htd, server.getWAL(region), server.getConfiguration(), server,
2071        null);
2072    } catch (IOException ie) {
2073      LOG.error("Failed warmup of {}", region.getRegionNameAsString(), ie);
2074      throw new ServiceException(ie);
2075    }
2076
2077    return response;
2078  }
2079
2080  private ExtendedCellScanner getAndReset(RpcController controller) {
2081    HBaseRpcController hrc = (HBaseRpcController) controller;
2082    ExtendedCellScanner cells = hrc.cellScanner();
2083    hrc.setCellScanner(null);
2084    return cells;
2085  }
2086
2087  /**
2088   * Replay the given changes when distributedLogReplay WAL edits from a failed RS. The guarantee is
2089   * that the given mutations will be durable on the receiving RS if this method returns without any
2090   * exception.
2091   * @param controller the RPC controller
2092   * @param request    the request
2093   * @deprecated Since 3.0.0, will be removed in 4.0.0. Not used any more, put here only for
2094   *             compatibility with old region replica implementation. Now we will use
2095   *             {@code replicateToReplica} method instead.
2096   */
2097  @Deprecated
2098  @Override
2099  @QosPriority(priority = HConstants.REPLAY_QOS)
2100  public ReplicateWALEntryResponse replay(final RpcController controller,
2101    final ReplicateWALEntryRequest request) throws ServiceException {
2102    long before = EnvironmentEdgeManager.currentTime();
2103    ExtendedCellScanner cells = getAndReset(controller);
2104    try {
2105      checkOpen();
2106      List<WALEntry> entries = request.getEntryList();
2107      if (entries == null || entries.isEmpty()) {
2108        // empty input
2109        return ReplicateWALEntryResponse.newBuilder().build();
2110      }
2111      ByteString regionName = entries.get(0).getKey().getEncodedRegionName();
2112      HRegion region = server.getRegionByEncodedName(regionName.toStringUtf8());
2113      RegionCoprocessorHost coprocessorHost =
2114        ServerRegionReplicaUtil.isDefaultReplica(region.getRegionInfo())
2115          ? region.getCoprocessorHost()
2116          : null; // do not invoke coprocessors if this is a secondary region replica
2117
2118      // Skip adding the edits to WAL if this is a secondary region replica
2119      boolean isPrimary = RegionReplicaUtil.isDefaultReplica(region.getRegionInfo());
2120      Durability durability = isPrimary ? Durability.USE_DEFAULT : Durability.SKIP_WAL;
2121
2122      for (WALEntry entry : entries) {
2123        if (!regionName.equals(entry.getKey().getEncodedRegionName())) {
2124          throw new NotServingRegionException("Replay request contains entries from multiple "
2125            + "regions. First region:" + regionName.toStringUtf8() + " , other region:"
2126            + entry.getKey().getEncodedRegionName());
2127        }
2128        if (server.nonceManager != null && isPrimary) {
2129          long nonceGroup =
2130            entry.getKey().hasNonceGroup() ? entry.getKey().getNonceGroup() : HConstants.NO_NONCE;
2131          long nonce = entry.getKey().hasNonce() ? entry.getKey().getNonce() : HConstants.NO_NONCE;
2132          server.nonceManager.reportOperationFromWal(nonceGroup, nonce,
2133            entry.getKey().getWriteTime());
2134        }
2135        Pair<WALKey, WALEdit> walEntry = (coprocessorHost == null) ? null : new Pair<>();
2136        List<MutationReplay> edits =
2137          WALSplitUtil.getMutationsFromWALEntry(entry, cells, walEntry, durability);
2138        if (edits != null && !edits.isEmpty()) {
2139          // HBASE-17924
2140          // sort to improve lock efficiency
2141          Collections.sort(edits, (v1, v2) -> Row.COMPARATOR.compare(v1.mutation, v2.mutation));
2142          long replaySeqId = (entry.getKey().hasOrigSequenceNumber())
2143            ? entry.getKey().getOrigSequenceNumber()
2144            : entry.getKey().getLogSequenceNumber();
2145          OperationStatus[] result = doReplayBatchOp(region, edits, replaySeqId);
2146          // check if it's a partial success
2147          for (int i = 0; result != null && i < result.length; i++) {
2148            if (result[i] != OperationStatus.SUCCESS) {
2149              throw new IOException(result[i].getExceptionMsg());
2150            }
2151          }
2152        }
2153      }
2154
2155      // sync wal at the end because ASYNC_WAL is used above
2156      WAL wal = region.getWAL();
2157      if (wal != null) {
2158        wal.sync();
2159      }
2160      return ReplicateWALEntryResponse.newBuilder().build();
2161    } catch (IOException ie) {
2162      throw new ServiceException(ie);
2163    } finally {
2164      final MetricsRegionServer metricsRegionServer = server.getMetrics();
2165      if (metricsRegionServer != null) {
2166        metricsRegionServer.updateReplay(EnvironmentEdgeManager.currentTime() - before);
2167      }
2168    }
2169  }
2170
2171  /**
2172   * Replay the given changes on a secondary replica
2173   */
2174  @Override
2175  public ReplicateWALEntryResponse replicateToReplica(RpcController controller,
2176    ReplicateWALEntryRequest request) throws ServiceException {
2177    CellScanner cells = getAndReset(controller);
2178    try {
2179      checkOpen();
2180      List<WALEntry> entries = request.getEntryList();
2181      if (entries == null || entries.isEmpty()) {
2182        // empty input
2183        return ReplicateWALEntryResponse.newBuilder().build();
2184      }
2185      ByteString regionName = entries.get(0).getKey().getEncodedRegionName();
2186      HRegion region = server.getRegionByEncodedName(regionName.toStringUtf8());
2187      if (RegionReplicaUtil.isDefaultReplica(region.getRegionInfo())) {
2188        throw new DoNotRetryIOException(
2189          "Should not replicate to primary replica " + region.getRegionInfo() + ", CODE BUG?");
2190      }
2191      for (WALEntry entry : entries) {
2192        if (!regionName.equals(entry.getKey().getEncodedRegionName())) {
2193          throw new NotServingRegionException(
2194            "ReplicateToReplica request contains entries from multiple " + "regions. First region:"
2195              + regionName.toStringUtf8() + " , other region:"
2196              + entry.getKey().getEncodedRegionName());
2197        }
2198        region.replayWALEntry(entry, cells);
2199      }
2200      return ReplicateWALEntryResponse.newBuilder().build();
2201    } catch (IOException ie) {
2202      throw new ServiceException(ie);
2203    }
2204  }
2205
2206  private void checkShouldRejectReplicationRequest(List<WALEntry> entries) throws IOException {
2207    ReplicationSourceService replicationSource = server.getReplicationSourceService();
2208    if (replicationSource == null || entries.isEmpty()) {
2209      return;
2210    }
2211    // We can ensure that all entries are for one peer, so only need to check one entry's
2212    // table name. if the table hit sync replication at peer side and the peer cluster
2213    // is (or is transiting to) state ACTIVE or DOWNGRADE_ACTIVE, we should reject to apply
2214    // those entries according to the design doc.
2215    TableName table = TableName.valueOf(entries.get(0).getKey().getTableName().toByteArray());
2216    if (
2217      replicationSource.getSyncReplicationPeerInfoProvider().checkState(table,
2218        RejectReplicationRequestStateChecker.get())
2219    ) {
2220      throw new DoNotRetryIOException(
2221        "Reject to apply to sink cluster because sync replication state of sink cluster "
2222          + "is ACTIVE or DOWNGRADE_ACTIVE, table: " + table);
2223    }
2224  }
2225
2226  /**
2227   * Replicate WAL entries on the region server.
2228   * @param controller the RPC controller
2229   * @param request    the request
2230   */
2231  @Override
2232  @QosPriority(priority = HConstants.REPLICATION_QOS)
2233  public ReplicateWALEntryResponse replicateWALEntry(final RpcController controller,
2234    final ReplicateWALEntryRequest request) throws ServiceException {
2235    try {
2236      checkOpen();
2237      if (server.getReplicationSinkService() != null) {
2238        requestCount.increment();
2239        List<WALEntry> entries = request.getEntryList();
2240        checkShouldRejectReplicationRequest(entries);
2241        ExtendedCellScanner cellScanner = getAndReset(controller);
2242        server.getRegionServerCoprocessorHost().preReplicateLogEntries();
2243        server.getReplicationSinkService().replicateLogEntries(entries, cellScanner,
2244          request.getReplicationClusterId(), request.getSourceBaseNamespaceDirPath(),
2245          request.getSourceHFileArchiveDirPath());
2246        server.getRegionServerCoprocessorHost().postReplicateLogEntries();
2247        return ReplicateWALEntryResponse.newBuilder().build();
2248      } else {
2249        throw new ServiceException("Replication services are not initialized yet");
2250      }
2251    } catch (IOException ie) {
2252      throw new ServiceException(ie);
2253    }
2254  }
2255
2256  /**
2257   * Roll the WAL writer of the region server.
2258   * @param controller the RPC controller
2259   * @param request    the request
2260   */
2261  @Override
2262  @QosPriority(priority = HConstants.ADMIN_QOS)
2263  public RollWALWriterResponse rollWALWriter(final RpcController controller,
2264    final RollWALWriterRequest request) throws ServiceException {
2265    try {
2266      checkOpen();
2267      requestCount.increment();
2268      server.getRegionServerCoprocessorHost().preRollWALWriterRequest();
2269      server.getWalRoller().requestRollAll();
2270      server.getRegionServerCoprocessorHost().postRollWALWriterRequest();
2271      RollWALWriterResponse.Builder builder = RollWALWriterResponse.newBuilder();
2272      return builder.build();
2273    } catch (IOException ie) {
2274      throw new ServiceException(ie);
2275    }
2276  }
2277
2278  /**
2279   * Stop the region server.
2280   * @param controller the RPC controller
2281   * @param request    the request
2282   */
2283  @Override
2284  @QosPriority(priority = HConstants.ADMIN_QOS)
2285  public StopServerResponse stopServer(final RpcController controller,
2286    final StopServerRequest request) throws ServiceException {
2287    rpcPreCheck("stopServer");
2288    requestCount.increment();
2289    String reason = request.getReason();
2290    server.stop(reason);
2291    return StopServerResponse.newBuilder().build();
2292  }
2293
2294  @Override
2295  public UpdateFavoredNodesResponse updateFavoredNodes(RpcController controller,
2296    UpdateFavoredNodesRequest request) throws ServiceException {
2297    rpcPreCheck("updateFavoredNodes");
2298    List<UpdateFavoredNodesRequest.RegionUpdateInfo> openInfoList = request.getUpdateInfoList();
2299    UpdateFavoredNodesResponse.Builder respBuilder = UpdateFavoredNodesResponse.newBuilder();
2300    for (UpdateFavoredNodesRequest.RegionUpdateInfo regionUpdateInfo : openInfoList) {
2301      RegionInfo hri = ProtobufUtil.toRegionInfo(regionUpdateInfo.getRegion());
2302      if (regionUpdateInfo.getFavoredNodesCount() > 0) {
2303        server.updateRegionFavoredNodesMapping(hri.getEncodedName(),
2304          regionUpdateInfo.getFavoredNodesList());
2305      }
2306    }
2307    respBuilder.setResponse(openInfoList.size());
2308    return respBuilder.build();
2309  }
2310
2311  /**
2312   * Atomically bulk load several HFiles into an open region
2313   * @return true if successful, false is failed but recoverably (no action)
2314   * @throws ServiceException if failed unrecoverably
2315   */
2316  @Override
2317  public BulkLoadHFileResponse bulkLoadHFile(final RpcController controller,
2318    final BulkLoadHFileRequest request) throws ServiceException {
2319    long start = EnvironmentEdgeManager.currentTime();
2320    List<String> clusterIds = new ArrayList<>(request.getClusterIdsList());
2321    if (clusterIds.contains(this.server.getClusterId())) {
2322      return BulkLoadHFileResponse.newBuilder().setLoaded(true).build();
2323    } else {
2324      clusterIds.add(this.server.getClusterId());
2325    }
2326    try {
2327      checkOpen();
2328      requestCount.increment();
2329      HRegion region = getRegion(request.getRegion());
2330      final boolean spaceQuotaEnabled = QuotaUtil.isQuotaEnabled(getConfiguration());
2331      long sizeToBeLoaded = -1;
2332
2333      // Check to see if this bulk load would exceed the space quota for this table
2334      if (spaceQuotaEnabled) {
2335        ActivePolicyEnforcement activeSpaceQuotas = getSpaceQuotaManager().getActiveEnforcements();
2336        SpaceViolationPolicyEnforcement enforcement =
2337          activeSpaceQuotas.getPolicyEnforcement(region);
2338        if (enforcement != null) {
2339          // Bulk loads must still be atomic. We must enact all or none.
2340          List<String> filePaths = new ArrayList<>(request.getFamilyPathCount());
2341          for (FamilyPath familyPath : request.getFamilyPathList()) {
2342            filePaths.add(familyPath.getPath());
2343          }
2344          // Check if the batch of files exceeds the current quota
2345          sizeToBeLoaded = enforcement.computeBulkLoadSize(getFileSystem(filePaths), filePaths);
2346        }
2347      }
2348      // secure bulk load
2349      Map<byte[], List<Path>> map =
2350        server.getSecureBulkLoadManager().secureBulkLoadHFiles(region, request, clusterIds);
2351      BulkLoadHFileResponse.Builder builder = BulkLoadHFileResponse.newBuilder();
2352      builder.setLoaded(map != null);
2353      if (map != null) {
2354        // Treat any negative size as a flag to "ignore" updating the region size as that is
2355        // not possible to occur in real life (cannot bulk load a file with negative size)
2356        if (spaceQuotaEnabled && sizeToBeLoaded > 0) {
2357          if (LOG.isTraceEnabled()) {
2358            LOG.trace("Incrementing space use of " + region.getRegionInfo() + " by "
2359              + sizeToBeLoaded + " bytes");
2360          }
2361          // Inform space quotas of the new files for this region
2362          getSpaceQuotaManager().getRegionSizeStore().incrementRegionSize(region.getRegionInfo(),
2363            sizeToBeLoaded);
2364        }
2365      }
2366      return builder.build();
2367    } catch (IOException ie) {
2368      throw new ServiceException(ie);
2369    } finally {
2370      final MetricsRegionServer metricsRegionServer = server.getMetrics();
2371      if (metricsRegionServer != null) {
2372        metricsRegionServer.updateBulkLoad(EnvironmentEdgeManager.currentTime() - start);
2373      }
2374    }
2375  }
2376
2377  @Override
2378  public PrepareBulkLoadResponse prepareBulkLoad(RpcController controller,
2379    PrepareBulkLoadRequest request) throws ServiceException {
2380    try {
2381      checkOpen();
2382      requestCount.increment();
2383
2384      HRegion region = getRegion(request.getRegion());
2385
2386      String bulkToken = server.getSecureBulkLoadManager().prepareBulkLoad(region, request);
2387      PrepareBulkLoadResponse.Builder builder = PrepareBulkLoadResponse.newBuilder();
2388      builder.setBulkToken(bulkToken);
2389      return builder.build();
2390    } catch (IOException ie) {
2391      throw new ServiceException(ie);
2392    }
2393  }
2394
2395  @Override
2396  public CleanupBulkLoadResponse cleanupBulkLoad(RpcController controller,
2397    CleanupBulkLoadRequest request) throws ServiceException {
2398    try {
2399      checkOpen();
2400      requestCount.increment();
2401
2402      HRegion region = getRegion(request.getRegion());
2403
2404      server.getSecureBulkLoadManager().cleanupBulkLoad(region, request);
2405      return CleanupBulkLoadResponse.newBuilder().build();
2406    } catch (IOException ie) {
2407      throw new ServiceException(ie);
2408    }
2409  }
2410
2411  @Override
2412  public CoprocessorServiceResponse execService(final RpcController controller,
2413    final CoprocessorServiceRequest request) throws ServiceException {
2414    try {
2415      checkOpen();
2416      requestCount.increment();
2417      HRegion region = getRegion(request.getRegion());
2418      Message result = execServiceOnRegion(region, request.getCall());
2419      CoprocessorServiceResponse.Builder builder = CoprocessorServiceResponse.newBuilder();
2420      builder.setRegion(RequestConverter.buildRegionSpecifier(RegionSpecifierType.REGION_NAME,
2421        region.getRegionInfo().getRegionName()));
2422      // TODO: COPIES!!!!!!
2423      builder.setValue(builder.getValueBuilder().setName(result.getClass().getName()).setValue(
2424        org.apache.hbase.thirdparty.com.google.protobuf.ByteString.copyFrom(result.toByteArray())));
2425      return builder.build();
2426    } catch (IOException ie) {
2427      throw new ServiceException(ie);
2428    }
2429  }
2430
2431  private FileSystem getFileSystem(List<String> filePaths) throws IOException {
2432    if (filePaths.isEmpty()) {
2433      // local hdfs
2434      return server.getFileSystem();
2435    }
2436    // source hdfs
2437    return new Path(filePaths.get(0)).getFileSystem(server.getConfiguration());
2438  }
2439
2440  private Message execServiceOnRegion(HRegion region,
2441    final ClientProtos.CoprocessorServiceCall serviceCall) throws IOException {
2442    // ignore the passed in controller (from the serialized call)
2443    ServerRpcController execController = new ServerRpcController();
2444    return region.execService(execController, serviceCall);
2445  }
2446
2447  private boolean shouldRejectRequestsFromClient(HRegion region) {
2448    TableName table = region.getRegionInfo().getTable();
2449    ReplicationSourceService service = server.getReplicationSourceService();
2450    return service != null && service.getSyncReplicationPeerInfoProvider().checkState(table,
2451      RejectRequestsFromClientStateChecker.get());
2452  }
2453
2454  private void rejectIfInStandByState(HRegion region) throws DoNotRetryIOException {
2455    if (shouldRejectRequestsFromClient(region)) {
2456      throw new DoNotRetryIOException(
2457        region.getRegionInfo().getRegionNameAsString() + " is in STANDBY state.");
2458    }
2459  }
2460
2461  /**
2462   * Get data from a table.
2463   * @param controller the RPC controller
2464   * @param request    the get request
2465   */
2466  @Override
2467  public GetResponse get(final RpcController controller, final GetRequest request)
2468    throws ServiceException {
2469    long before = EnvironmentEdgeManager.currentTime();
2470    OperationQuota quota = null;
2471    HRegion region = null;
2472    RpcCallContext context = RpcServer.getCurrentCall().orElse(null);
2473    try {
2474      checkOpen();
2475      requestCount.increment();
2476      rpcGetRequestCount.increment();
2477      region = getRegion(request.getRegion());
2478      rejectIfInStandByState(region);
2479
2480      GetResponse.Builder builder = GetResponse.newBuilder();
2481      ClientProtos.Get get = request.getGet();
2482      // An asynchbase client, https://github.com/OpenTSDB/asynchbase, starts by trying to do
2483      // a get closest before. Throwing the UnknownProtocolException signals it that it needs
2484      // to switch and do hbase2 protocol (HBase servers do not tell clients what versions
2485      // they are; its a problem for non-native clients like asynchbase. HBASE-20225.
2486      if (get.hasClosestRowBefore() && get.getClosestRowBefore()) {
2487        throw new UnknownProtocolException("Is this a pre-hbase-1.0.0 or asynchbase client? "
2488          + "Client is invoking getClosestRowBefore removed in hbase-2.0.0 replaced by "
2489          + "reverse Scan.");
2490      }
2491      Boolean existence = null;
2492      Result r = null;
2493      quota = getRpcQuotaManager().checkBatchQuota(region, OperationQuota.OperationType.GET);
2494
2495      Get clientGet = ProtobufUtil.toGet(get);
2496      if (get.getExistenceOnly() && region.getCoprocessorHost() != null) {
2497        existence = region.getCoprocessorHost().preExists(clientGet);
2498      }
2499      if (existence == null) {
2500        if (context != null) {
2501          r = get(clientGet, (region), null, context);
2502        } else {
2503          // for test purpose
2504          r = region.get(clientGet);
2505        }
2506        if (get.getExistenceOnly()) {
2507          boolean exists = r.getExists();
2508          if (region.getCoprocessorHost() != null) {
2509            exists = region.getCoprocessorHost().postExists(clientGet, exists);
2510          }
2511          existence = exists;
2512        }
2513      }
2514      if (existence != null) {
2515        ClientProtos.Result pbr =
2516          ProtobufUtil.toResult(existence, region.getRegionInfo().getReplicaId() != 0);
2517        builder.setResult(pbr);
2518      } else if (r != null) {
2519        ClientProtos.Result pbr;
2520        if (
2521          isClientCellBlockSupport(context) && controller instanceof HBaseRpcController
2522            && VersionInfoUtil.hasMinimumVersion(context.getClientVersionInfo(), 1, 3)
2523        ) {
2524          pbr = ProtobufUtil.toResultNoData(r);
2525          ((HBaseRpcController) controller).setCellScanner(
2526            PrivateCellUtil.createExtendedCellScanner(ClientInternalHelper.getExtendedRawCells(r)));
2527          addSize(context, r);
2528        } else {
2529          pbr = ProtobufUtil.toResult(r);
2530        }
2531        builder.setResult(pbr);
2532      }
2533      // r.cells is null when an table.exists(get) call
2534      if (r != null && r.rawCells() != null) {
2535        quota.addGetResult(r);
2536      }
2537      return builder.build();
2538    } catch (IOException ie) {
2539      throw new ServiceException(ie);
2540    } finally {
2541      final MetricsRegionServer metricsRegionServer = server.getMetrics();
2542      if (metricsRegionServer != null && region != null) {
2543        long blockBytesScanned = context != null ? context.getBlockBytesScanned() : 0;
2544        metricsRegionServer.updateGet(region, EnvironmentEdgeManager.currentTime() - before,
2545          blockBytesScanned);
2546      }
2547      if (quota != null) {
2548        quota.close();
2549      }
2550    }
2551  }
2552
2553  private Result get(Get get, HRegion region, RegionScannersCloseCallBack closeCallBack,
2554    RpcCallContext context) throws IOException {
2555    region.prepareGet(get);
2556    boolean stale = region.getRegionInfo().getReplicaId() != 0;
2557
2558    // This method is almost the same as HRegion#get.
2559    List<Cell> results = new ArrayList<>();
2560    // pre-get CP hook
2561    if (region.getCoprocessorHost() != null) {
2562      if (region.getCoprocessorHost().preGet(get, results)) {
2563        region.metricsUpdateForGet();
2564        return Result.create(results, get.isCheckExistenceOnly() ? !results.isEmpty() : null,
2565          stale);
2566      }
2567    }
2568    Scan scan = new Scan(get);
2569    if (scan.getLoadColumnFamiliesOnDemandValue() == null) {
2570      scan.setLoadColumnFamiliesOnDemand(region.isLoadingCfsOnDemandDefault());
2571    }
2572    RegionScannerImpl scanner = null;
2573    long blockBytesScannedBefore = context.getBlockBytesScanned();
2574    try {
2575      scanner = region.getScanner(scan);
2576      scanner.next(results);
2577    } finally {
2578      if (scanner != null) {
2579        if (closeCallBack == null) {
2580          // If there is a context then the scanner can be added to the current
2581          // RpcCallContext. The rpc callback will take care of closing the
2582          // scanner, for eg in case
2583          // of get()
2584          context.setCallBack(scanner);
2585        } else {
2586          // The call is from multi() where the results from the get() are
2587          // aggregated and then send out to the
2588          // rpc. The rpccall back will close all such scanners created as part
2589          // of multi().
2590          closeCallBack.addScanner(scanner);
2591        }
2592      }
2593    }
2594
2595    // post-get CP hook
2596    if (region.getCoprocessorHost() != null) {
2597      region.getCoprocessorHost().postGet(get, results);
2598    }
2599    region.metricsUpdateForGet();
2600
2601    Result r =
2602      Result.create(results, get.isCheckExistenceOnly() ? !results.isEmpty() : null, stale);
2603    if (get.isQueryMetricsEnabled()) {
2604      long blockBytesScanned = context.getBlockBytesScanned() - blockBytesScannedBefore;
2605      r.setMetrics(new QueryMetrics(blockBytesScanned));
2606    }
2607    return r;
2608  }
2609
2610  private void checkBatchSizeAndLogLargeSize(MultiRequest request) throws ServiceException {
2611    int sum = 0;
2612    String firstRegionName = null;
2613    for (RegionAction regionAction : request.getRegionActionList()) {
2614      if (sum == 0) {
2615        firstRegionName = Bytes.toStringBinary(regionAction.getRegion().getValue().toByteArray());
2616      }
2617      sum += regionAction.getActionCount();
2618    }
2619    if (sum > rowSizeWarnThreshold) {
2620      LOG.warn("Large batch operation detected (greater than " + rowSizeWarnThreshold
2621        + ") (HBASE-18023)." + " Requested Number of Rows: " + sum + " Client: "
2622        + RpcServer.getRequestUserName().orElse(null) + "/"
2623        + RpcServer.getRemoteAddress().orElse(null) + " first region in multi=" + firstRegionName);
2624      if (rejectRowsWithSizeOverThreshold) {
2625        throw new ServiceException(
2626          "Rejecting large batch operation for current batch with firstRegionName: "
2627            + firstRegionName + " , Requested Number of Rows: " + sum + " , Size Threshold: "
2628            + rowSizeWarnThreshold);
2629      }
2630    }
2631  }
2632
2633  private void failRegionAction(MultiResponse.Builder responseBuilder,
2634    RegionActionResult.Builder regionActionResultBuilder, RegionAction regionAction,
2635    CellScanner cellScanner, Throwable error) {
2636    rpcServer.getMetrics().exception(error);
2637    regionActionResultBuilder.setException(ResponseConverter.buildException(error));
2638    responseBuilder.addRegionActionResult(regionActionResultBuilder.build());
2639    // All Mutations in this RegionAction not executed as we can not see the Region online here
2640    // in this RS. Will be retried from Client. Skipping all the Cells in CellScanner
2641    // corresponding to these Mutations.
2642    if (cellScanner != null) {
2643      skipCellsForMutations(regionAction.getActionList(), cellScanner);
2644    }
2645  }
2646
2647  private boolean isReplicationRequest(Action action) {
2648    // replication request can only be put or delete.
2649    if (!action.hasMutation()) {
2650      return false;
2651    }
2652    MutationProto mutation = action.getMutation();
2653    MutationType type = mutation.getMutateType();
2654    if (type != MutationType.PUT && type != MutationType.DELETE) {
2655      return false;
2656    }
2657    // replication will set a special attribute so we can make use of it to decide whether a request
2658    // is for replication.
2659    return mutation.getAttributeList().stream().map(p -> p.getName())
2660      .filter(n -> n.equals(ReplicationUtils.REPLICATION_ATTR_NAME)).findAny().isPresent();
2661  }
2662
2663  /**
2664   * Execute multiple actions on a table: get, mutate, and/or execCoprocessor
2665   * @param rpcc    the RPC controller
2666   * @param request the multi request
2667   */
2668  @Override
2669  public MultiResponse multi(final RpcController rpcc, final MultiRequest request)
2670    throws ServiceException {
2671    try {
2672      checkOpen();
2673    } catch (IOException ie) {
2674      throw new ServiceException(ie);
2675    }
2676
2677    checkBatchSizeAndLogLargeSize(request);
2678
2679    // rpc controller is how we bring in data via the back door; it is unprotobuf'ed data.
2680    // It is also the conduit via which we pass back data.
2681    HBaseRpcController controller = (HBaseRpcController) rpcc;
2682    CellScanner cellScanner = controller != null ? getAndReset(controller) : null;
2683
2684    long nonceGroup = request.hasNonceGroup() ? request.getNonceGroup() : HConstants.NO_NONCE;
2685
2686    MultiResponse.Builder responseBuilder = MultiResponse.newBuilder();
2687    RegionActionResult.Builder regionActionResultBuilder = RegionActionResult.newBuilder();
2688    this.rpcMultiRequestCount.increment();
2689    this.requestCount.increment();
2690    ActivePolicyEnforcement spaceQuotaEnforcement = getSpaceQuotaManager().getActiveEnforcements();
2691
2692    // We no longer use MultiRequest#condition. Instead, we use RegionAction#condition. The
2693    // following logic is for backward compatibility as old clients still use
2694    // MultiRequest#condition in case of checkAndMutate with RowMutations.
2695    if (request.hasCondition()) {
2696      if (request.getRegionActionList().isEmpty()) {
2697        // If the region action list is empty, do nothing.
2698        responseBuilder.setProcessed(true);
2699        return responseBuilder.build();
2700      }
2701
2702      RegionAction regionAction = request.getRegionAction(0);
2703
2704      // When request.hasCondition() is true, regionAction.getAtomic() should be always true. So
2705      // we can assume regionAction.getAtomic() is true here.
2706      assert regionAction.getAtomic();
2707
2708      OperationQuota quota;
2709      HRegion region;
2710      RegionSpecifier regionSpecifier = regionAction.getRegion();
2711
2712      try {
2713        region = getRegion(regionSpecifier);
2714        quota = getRpcQuotaManager().checkBatchQuota(region, regionAction.getActionList(),
2715          regionAction.hasCondition());
2716      } catch (IOException e) {
2717        failRegionAction(responseBuilder, regionActionResultBuilder, regionAction, cellScanner, e);
2718        return responseBuilder.build();
2719      }
2720
2721      try {
2722        boolean rejectIfFromClient = shouldRejectRequestsFromClient(region);
2723        // We only allow replication in standby state and it will not set the atomic flag.
2724        if (rejectIfFromClient) {
2725          failRegionAction(responseBuilder, regionActionResultBuilder, regionAction, cellScanner,
2726            new DoNotRetryIOException(
2727              region.getRegionInfo().getRegionNameAsString() + " is in STANDBY state"));
2728          return responseBuilder.build();
2729        }
2730
2731        try {
2732          CheckAndMutateResult result = checkAndMutate(region, regionAction.getActionList(),
2733            cellScanner, request.getCondition(), nonceGroup, spaceQuotaEnforcement);
2734          responseBuilder.setProcessed(result.isSuccess());
2735          ClientProtos.ResultOrException.Builder resultOrExceptionOrBuilder =
2736            ClientProtos.ResultOrException.newBuilder();
2737          for (int i = 0; i < regionAction.getActionCount(); i++) {
2738            // To unify the response format with doNonAtomicRegionMutation and read through
2739            // client's AsyncProcess we have to add an empty result instance per operation
2740            resultOrExceptionOrBuilder.clear();
2741            resultOrExceptionOrBuilder.setIndex(i);
2742            regionActionResultBuilder.addResultOrException(resultOrExceptionOrBuilder.build());
2743          }
2744        } catch (IOException e) {
2745          rpcServer.getMetrics().exception(e);
2746          // As it's an atomic operation with a condition, we may expect it's a global failure.
2747          regionActionResultBuilder.setException(ResponseConverter.buildException(e));
2748        }
2749      } finally {
2750        quota.close();
2751      }
2752
2753      responseBuilder.addRegionActionResult(regionActionResultBuilder.build());
2754      ClientProtos.RegionLoadStats regionLoadStats = region.getLoadStatistics();
2755      if (regionLoadStats != null) {
2756        responseBuilder.setRegionStatistics(MultiRegionLoadStats.newBuilder()
2757          .addRegion(regionSpecifier).addStat(regionLoadStats).build());
2758      }
2759      return responseBuilder.build();
2760    }
2761
2762    // this will contain all the cells that we need to return. It's created later, if needed.
2763    List<ExtendedCellScannable> cellsToReturn = null;
2764    RegionScannersCloseCallBack closeCallBack = null;
2765    RpcCallContext context = RpcServer.getCurrentCall().orElse(null);
2766    Map<RegionSpecifier, ClientProtos.RegionLoadStats> regionStats =
2767      new HashMap<>(request.getRegionActionCount());
2768
2769    for (RegionAction regionAction : request.getRegionActionList()) {
2770      OperationQuota quota;
2771      HRegion region;
2772      RegionSpecifier regionSpecifier = regionAction.getRegion();
2773      regionActionResultBuilder.clear();
2774
2775      try {
2776        region = getRegion(regionSpecifier);
2777        quota = getRpcQuotaManager().checkBatchQuota(region, regionAction.getActionList(),
2778          regionAction.hasCondition());
2779      } catch (IOException e) {
2780        failRegionAction(responseBuilder, regionActionResultBuilder, regionAction, cellScanner, e);
2781        continue; // For this region it's a failure.
2782      }
2783
2784      try {
2785        boolean rejectIfFromClient = shouldRejectRequestsFromClient(region);
2786
2787        if (regionAction.hasCondition()) {
2788          // We only allow replication in standby state and it will not set the atomic flag.
2789          if (rejectIfFromClient) {
2790            failRegionAction(responseBuilder, regionActionResultBuilder, regionAction, cellScanner,
2791              new DoNotRetryIOException(
2792                region.getRegionInfo().getRegionNameAsString() + " is in STANDBY state"));
2793            continue;
2794          }
2795
2796          try {
2797            ClientProtos.ResultOrException.Builder resultOrExceptionOrBuilder =
2798              ClientProtos.ResultOrException.newBuilder();
2799            if (regionAction.getActionCount() == 1) {
2800              CheckAndMutateResult result =
2801                checkAndMutate(region, quota, regionAction.getAction(0).getMutation(), cellScanner,
2802                  regionAction.getCondition(), nonceGroup, spaceQuotaEnforcement, context);
2803              regionActionResultBuilder.setProcessed(result.isSuccess());
2804              resultOrExceptionOrBuilder.setIndex(0);
2805              if (result.getResult() != null) {
2806                resultOrExceptionOrBuilder.setResult(ProtobufUtil.toResult(result.getResult()));
2807              }
2808
2809              if (result.getMetrics() != null) {
2810                resultOrExceptionOrBuilder
2811                  .setMetrics(ProtobufUtil.toQueryMetrics(result.getMetrics()));
2812              }
2813
2814              regionActionResultBuilder.addResultOrException(resultOrExceptionOrBuilder.build());
2815            } else {
2816              CheckAndMutateResult result = checkAndMutate(region, regionAction.getActionList(),
2817                cellScanner, regionAction.getCondition(), nonceGroup, spaceQuotaEnforcement);
2818              regionActionResultBuilder.setProcessed(result.isSuccess());
2819              for (int i = 0; i < regionAction.getActionCount(); i++) {
2820                if (i == 0 && result.getResult() != null) {
2821                  // Set the result of the Increment/Append operations to the first element of the
2822                  // ResultOrException list
2823                  resultOrExceptionOrBuilder.setIndex(i);
2824                  regionActionResultBuilder.addResultOrException(resultOrExceptionOrBuilder
2825                    .setResult(ProtobufUtil.toResult(result.getResult())).build());
2826                  continue;
2827                }
2828                // To unify the response format with doNonAtomicRegionMutation and read through
2829                // client's AsyncProcess we have to add an empty result instance per operation
2830                resultOrExceptionOrBuilder.clear();
2831                resultOrExceptionOrBuilder.setIndex(i);
2832                regionActionResultBuilder.addResultOrException(resultOrExceptionOrBuilder.build());
2833              }
2834            }
2835          } catch (IOException e) {
2836            rpcServer.getMetrics().exception(e);
2837            // As it's an atomic operation with a condition, we may expect it's a global failure.
2838            regionActionResultBuilder.setException(ResponseConverter.buildException(e));
2839          }
2840        } else if (regionAction.hasAtomic() && regionAction.getAtomic()) {
2841          // We only allow replication in standby state and it will not set the atomic flag.
2842          if (rejectIfFromClient) {
2843            failRegionAction(responseBuilder, regionActionResultBuilder, regionAction, cellScanner,
2844              new DoNotRetryIOException(
2845                region.getRegionInfo().getRegionNameAsString() + " is in STANDBY state"));
2846            continue;
2847          }
2848          try {
2849            doAtomicBatchOp(regionActionResultBuilder, region, quota, regionAction.getActionList(),
2850              cellScanner, nonceGroup, spaceQuotaEnforcement);
2851            regionActionResultBuilder.setProcessed(true);
2852            // We no longer use MultiResponse#processed. Instead, we use
2853            // RegionActionResult#processed. This is for backward compatibility for old clients.
2854            responseBuilder.setProcessed(true);
2855          } catch (IOException e) {
2856            rpcServer.getMetrics().exception(e);
2857            // As it's atomic, we may expect it's a global failure.
2858            regionActionResultBuilder.setException(ResponseConverter.buildException(e));
2859          }
2860        } else {
2861          if (
2862            rejectIfFromClient && regionAction.getActionCount() > 0
2863              && !isReplicationRequest(regionAction.getAction(0))
2864          ) {
2865            // fail if it is not a replication request
2866            failRegionAction(responseBuilder, regionActionResultBuilder, regionAction, cellScanner,
2867              new DoNotRetryIOException(
2868                region.getRegionInfo().getRegionNameAsString() + " is in STANDBY state"));
2869            continue;
2870          }
2871          // doNonAtomicRegionMutation manages the exception internally
2872          if (context != null && closeCallBack == null) {
2873            // An RpcCallBack that creates a list of scanners that needs to perform callBack
2874            // operation on completion of multiGets.
2875            // Set this only once
2876            closeCallBack = new RegionScannersCloseCallBack();
2877            context.setCallBack(closeCallBack);
2878          }
2879          cellsToReturn = doNonAtomicRegionMutation(region, quota, regionAction, cellScanner,
2880            regionActionResultBuilder, cellsToReturn, nonceGroup, closeCallBack, context,
2881            spaceQuotaEnforcement);
2882        }
2883      } finally {
2884        quota.close();
2885      }
2886
2887      responseBuilder.addRegionActionResult(regionActionResultBuilder.build());
2888      ClientProtos.RegionLoadStats regionLoadStats = region.getLoadStatistics();
2889      if (regionLoadStats != null) {
2890        regionStats.put(regionSpecifier, regionLoadStats);
2891      }
2892    }
2893    // Load the controller with the Cells to return.
2894    if (cellsToReturn != null && !cellsToReturn.isEmpty() && controller != null) {
2895      controller.setCellScanner(PrivateCellUtil.createExtendedCellScanner(cellsToReturn));
2896    }
2897
2898    MultiRegionLoadStats.Builder builder = MultiRegionLoadStats.newBuilder();
2899    for (Entry<RegionSpecifier, ClientProtos.RegionLoadStats> stat : regionStats.entrySet()) {
2900      builder.addRegion(stat.getKey());
2901      builder.addStat(stat.getValue());
2902    }
2903    responseBuilder.setRegionStatistics(builder);
2904    return responseBuilder.build();
2905  }
2906
2907  private void skipCellsForMutations(List<Action> actions, CellScanner cellScanner) {
2908    if (cellScanner == null) {
2909      return;
2910    }
2911    for (Action action : actions) {
2912      skipCellsForMutation(action, cellScanner);
2913    }
2914  }
2915
2916  private void skipCellsForMutation(Action action, CellScanner cellScanner) {
2917    if (cellScanner == null) {
2918      return;
2919    }
2920    try {
2921      if (action.hasMutation()) {
2922        MutationProto m = action.getMutation();
2923        if (m.hasAssociatedCellCount()) {
2924          for (int i = 0; i < m.getAssociatedCellCount(); i++) {
2925            cellScanner.advance();
2926          }
2927        }
2928      }
2929    } catch (IOException e) {
2930      // No need to handle these Individual Muatation level issue. Any way this entire RegionAction
2931      // marked as failed as we could not see the Region here. At client side the top level
2932      // RegionAction exception will be considered first.
2933      LOG.error("Error while skipping Cells in CellScanner for invalid Region Mutations", e);
2934    }
2935  }
2936
2937  /**
2938   * Mutate data in a table.
2939   * @param rpcc    the RPC controller
2940   * @param request the mutate request
2941   */
2942  @Override
2943  public MutateResponse mutate(final RpcController rpcc, final MutateRequest request)
2944    throws ServiceException {
2945    // rpc controller is how we bring in data via the back door; it is unprotobuf'ed data.
2946    // It is also the conduit via which we pass back data.
2947    HBaseRpcController controller = (HBaseRpcController) rpcc;
2948    CellScanner cellScanner = controller != null ? controller.cellScanner() : null;
2949    OperationQuota quota = null;
2950    RpcCallContext context = RpcServer.getCurrentCall().orElse(null);
2951    // Clear scanner so we are not holding on to reference across call.
2952    if (controller != null) {
2953      controller.setCellScanner(null);
2954    }
2955    try {
2956      checkOpen();
2957      requestCount.increment();
2958      rpcMutateRequestCount.increment();
2959      HRegion region = getRegion(request.getRegion());
2960      rejectIfInStandByState(region);
2961      MutateResponse.Builder builder = MutateResponse.newBuilder();
2962      MutationProto mutation = request.getMutation();
2963      if (!region.getRegionInfo().isMetaRegion()) {
2964        server.getMemStoreFlusher().reclaimMemStoreMemory();
2965      }
2966      long nonceGroup = request.hasNonceGroup() ? request.getNonceGroup() : HConstants.NO_NONCE;
2967      OperationQuota.OperationType operationType = QuotaUtil.getQuotaOperationType(request);
2968      quota = getRpcQuotaManager().checkBatchQuota(region, operationType);
2969      ActivePolicyEnforcement spaceQuotaEnforcement =
2970        getSpaceQuotaManager().getActiveEnforcements();
2971
2972      if (request.hasCondition()) {
2973        CheckAndMutateResult result = checkAndMutate(region, quota, mutation, cellScanner,
2974          request.getCondition(), nonceGroup, spaceQuotaEnforcement, context);
2975        builder.setProcessed(result.isSuccess());
2976        boolean clientCellBlockSupported = isClientCellBlockSupport(context);
2977        addResult(builder, result.getResult(), controller, clientCellBlockSupported);
2978        if (clientCellBlockSupported) {
2979          addSize(context, result.getResult());
2980        }
2981        if (result.getMetrics() != null) {
2982          builder.setMetrics(ProtobufUtil.toQueryMetrics(result.getMetrics()));
2983        }
2984      } else {
2985        Result r = null;
2986        Boolean processed = null;
2987        MutationType type = mutation.getMutateType();
2988        switch (type) {
2989          case APPEND:
2990            // TODO: this doesn't actually check anything.
2991            r = append(region, quota, mutation, cellScanner, nonceGroup, spaceQuotaEnforcement,
2992              context);
2993            break;
2994          case INCREMENT:
2995            // TODO: this doesn't actually check anything.
2996            r = increment(region, quota, mutation, cellScanner, nonceGroup, spaceQuotaEnforcement,
2997              context);
2998            break;
2999          case PUT:
3000            put(region, quota, mutation, cellScanner, spaceQuotaEnforcement);
3001            processed = Boolean.TRUE;
3002            break;
3003          case DELETE:
3004            delete(region, quota, mutation, cellScanner, spaceQuotaEnforcement);
3005            processed = Boolean.TRUE;
3006            break;
3007          default:
3008            throw new DoNotRetryIOException("Unsupported mutate type: " + type.name());
3009        }
3010        if (processed != null) {
3011          builder.setProcessed(processed);
3012        }
3013        boolean clientCellBlockSupported = isClientCellBlockSupport(context);
3014        addResult(builder, r, controller, clientCellBlockSupported);
3015        if (clientCellBlockSupported) {
3016          addSize(context, r);
3017        }
3018      }
3019      return builder.build();
3020    } catch (IOException ie) {
3021      server.checkFileSystem();
3022      throw new ServiceException(ie);
3023    } finally {
3024      if (quota != null) {
3025        quota.close();
3026      }
3027    }
3028  }
3029
3030  private void put(HRegion region, OperationQuota quota, MutationProto mutation,
3031    CellScanner cellScanner, ActivePolicyEnforcement spaceQuota) throws IOException {
3032    long before = EnvironmentEdgeManager.currentTime();
3033    Put put = ProtobufUtil.toPut(mutation, cellScanner);
3034    checkCellSizeLimit(region, put);
3035    spaceQuota.getPolicyEnforcement(region).check(put);
3036    quota.addMutation(put);
3037    region.put(put);
3038
3039    MetricsRegionServer metricsRegionServer = server.getMetrics();
3040    if (metricsRegionServer != null) {
3041      long after = EnvironmentEdgeManager.currentTime();
3042      metricsRegionServer.updatePut(region, after - before);
3043    }
3044  }
3045
3046  private void delete(HRegion region, OperationQuota quota, MutationProto mutation,
3047    CellScanner cellScanner, ActivePolicyEnforcement spaceQuota) throws IOException {
3048    long before = EnvironmentEdgeManager.currentTime();
3049    Delete delete = ProtobufUtil.toDelete(mutation, cellScanner);
3050    checkCellSizeLimit(region, delete);
3051    spaceQuota.getPolicyEnforcement(region).check(delete);
3052    quota.addMutation(delete);
3053    region.delete(delete);
3054
3055    MetricsRegionServer metricsRegionServer = server.getMetrics();
3056    if (metricsRegionServer != null) {
3057      long after = EnvironmentEdgeManager.currentTime();
3058      metricsRegionServer.updateDelete(region, after - before);
3059    }
3060  }
3061
3062  private CheckAndMutateResult checkAndMutate(HRegion region, OperationQuota quota,
3063    MutationProto mutation, CellScanner cellScanner, Condition condition, long nonceGroup,
3064    ActivePolicyEnforcement spaceQuota, RpcCallContext context) throws IOException {
3065    long before = EnvironmentEdgeManager.currentTime();
3066    long blockBytesScannedBefore = context != null ? context.getBlockBytesScanned() : 0;
3067    CheckAndMutate checkAndMutate = ProtobufUtil.toCheckAndMutate(condition, mutation, cellScanner);
3068    long nonce = mutation.hasNonce() ? mutation.getNonce() : HConstants.NO_NONCE;
3069    checkCellSizeLimit(region, (Mutation) checkAndMutate.getAction());
3070    spaceQuota.getPolicyEnforcement(region).check((Mutation) checkAndMutate.getAction());
3071    quota.addMutation((Mutation) checkAndMutate.getAction());
3072
3073    CheckAndMutateResult result = null;
3074    if (region.getCoprocessorHost() != null) {
3075      result = region.getCoprocessorHost().preCheckAndMutate(checkAndMutate);
3076    }
3077    if (result == null) {
3078      result = region.checkAndMutate(checkAndMutate, nonceGroup, nonce);
3079      if (region.getCoprocessorHost() != null) {
3080        result = region.getCoprocessorHost().postCheckAndMutate(checkAndMutate, result);
3081      }
3082    }
3083    MetricsRegionServer metricsRegionServer = server.getMetrics();
3084    if (metricsRegionServer != null) {
3085      long after = EnvironmentEdgeManager.currentTime();
3086      long blockBytesScanned =
3087        context != null ? context.getBlockBytesScanned() - blockBytesScannedBefore : 0;
3088      metricsRegionServer.updateCheckAndMutate(region, after - before, blockBytesScanned);
3089
3090      MutationType type = mutation.getMutateType();
3091      switch (type) {
3092        case PUT:
3093          metricsRegionServer.updateCheckAndPut(region, after - before);
3094          break;
3095        case DELETE:
3096          metricsRegionServer.updateCheckAndDelete(region, after - before);
3097          break;
3098        default:
3099          break;
3100      }
3101    }
3102    return result;
3103  }
3104
3105  // This is used to keep compatible with the old client implementation. Consider remove it if we
3106  // decide to drop the support of the client that still sends close request to a region scanner
3107  // which has already been exhausted.
3108  @Deprecated
3109  private static final IOException SCANNER_ALREADY_CLOSED = new IOException() {
3110
3111    private static final long serialVersionUID = -4305297078988180130L;
3112
3113    @Override
3114    public synchronized Throwable fillInStackTrace() {
3115      return this;
3116    }
3117  };
3118
3119  private RegionScannerHolder getRegionScanner(ScanRequest request) throws IOException {
3120    String scannerName = toScannerName(request.getScannerId());
3121    RegionScannerHolder rsh = this.scanners.get(scannerName);
3122    if (rsh == null) {
3123      // just ignore the next or close request if scanner does not exists.
3124      Long lastCallSeq = closedScanners.getIfPresent(scannerName);
3125      if (lastCallSeq != null) {
3126        // Check the sequence number to catch if the last call was incorrectly retried.
3127        // The only allowed scenario is when the scanner is exhausted and one more scan
3128        // request arrives - in this case returning 0 rows is correct.
3129        if (request.hasNextCallSeq() && request.getNextCallSeq() != lastCallSeq + 1) {
3130          throw new OutOfOrderScannerNextException("Expected nextCallSeq for closed request: "
3131            + (lastCallSeq + 1) + " But the nextCallSeq got from client: "
3132            + request.getNextCallSeq() + "; request=" + TextFormat.shortDebugString(request));
3133        } else {
3134          throw SCANNER_ALREADY_CLOSED;
3135        }
3136      } else {
3137        LOG.warn("Client tried to access missing scanner " + scannerName);
3138        throw new UnknownScannerException(
3139          "Unknown scanner '" + scannerName + "'. This can happen due to any of the following "
3140            + "reasons: a) Scanner id given is wrong, b) Scanner lease expired because of "
3141            + "long wait between consecutive client checkins, c) Server may be closing down, "
3142            + "d) RegionServer restart during upgrade.\nIf the issue is due to reason (b), a "
3143            + "possible fix would be increasing the value of"
3144            + "'hbase.client.scanner.timeout.period' configuration.");
3145      }
3146    }
3147    rejectIfInStandByState(rsh.r);
3148    RegionInfo hri = rsh.s.getRegionInfo();
3149    // Yes, should be the same instance
3150    if (server.getOnlineRegion(hri.getRegionName()) != rsh.r) {
3151      String msg = "Region has changed on the scanner " + scannerName + ": regionName="
3152        + hri.getRegionNameAsString() + ", scannerRegionName=" + rsh.r;
3153      LOG.warn(msg + ", closing...");
3154      scanners.remove(scannerName);
3155      try {
3156        rsh.s.close();
3157      } catch (IOException e) {
3158        LOG.warn("Getting exception closing " + scannerName, e);
3159      } finally {
3160        try {
3161          server.getLeaseManager().cancelLease(scannerName);
3162        } catch (LeaseException e) {
3163          LOG.warn("Getting exception closing " + scannerName, e);
3164        }
3165      }
3166      throw new NotServingRegionException(msg);
3167    }
3168    return rsh;
3169  }
3170
3171  /**
3172   * @return Pair with scannerName key to use with this new Scanner and its RegionScannerHolder
3173   *         value.
3174   */
3175  private Pair<String, RegionScannerHolder> newRegionScanner(ScanRequest request, HRegion region,
3176    ScanResponse.Builder builder) throws IOException {
3177    rejectIfInStandByState(region);
3178    ClientProtos.Scan protoScan = request.getScan();
3179    boolean isLoadingCfsOnDemandSet = protoScan.hasLoadColumnFamiliesOnDemand();
3180    Scan scan = ProtobufUtil.toScan(protoScan);
3181    // if the request doesn't set this, get the default region setting.
3182    if (!isLoadingCfsOnDemandSet) {
3183      scan.setLoadColumnFamiliesOnDemand(region.isLoadingCfsOnDemandDefault());
3184    }
3185
3186    if (!scan.hasFamilies()) {
3187      // Adding all families to scanner
3188      for (byte[] family : region.getTableDescriptor().getColumnFamilyNames()) {
3189        scan.addFamily(family);
3190      }
3191    }
3192    if (region.getCoprocessorHost() != null) {
3193      // preScannerOpen is not allowed to return a RegionScanner. Only post hook can create a
3194      // wrapper for the core created RegionScanner
3195      region.getCoprocessorHost().preScannerOpen(scan);
3196    }
3197    RegionScannerImpl coreScanner = region.getScanner(scan);
3198    Shipper shipper = coreScanner;
3199    RegionScanner scanner = coreScanner;
3200    try {
3201      if (region.getCoprocessorHost() != null) {
3202        scanner = region.getCoprocessorHost().postScannerOpen(scan, scanner);
3203      }
3204    } catch (Exception e) {
3205      // Although region coprocessor is for advanced users and they should take care of the
3206      // implementation to not damage the HBase system, closing the scanner on exception here does
3207      // not have any bad side effect, so let's do it
3208      scanner.close();
3209      throw e;
3210    }
3211    long scannerId = scannerIdGenerator.generateNewScannerId();
3212    builder.setScannerId(scannerId);
3213    builder.setMvccReadPoint(scanner.getMvccReadPoint());
3214    builder.setTtl(scannerLeaseTimeoutPeriod);
3215    String scannerName = toScannerName(scannerId);
3216
3217    boolean fullRegionScan =
3218      !region.getRegionInfo().getTable().isSystemTable() && isFullRegionScan(scan, region);
3219
3220    return new Pair<String, RegionScannerHolder>(scannerName,
3221      addScanner(scannerName, scanner, shipper, region, scan.isNeedCursorResult(), fullRegionScan));
3222  }
3223
3224  /**
3225   * The returned String is used as key doing look up of outstanding Scanners in this Servers'
3226   * this.scanners, the Map of outstanding scanners and their current state.
3227   * @param scannerId A scanner long id.
3228   * @return The long id as a String.
3229   */
3230  private static String toScannerName(long scannerId) {
3231    return Long.toString(scannerId);
3232  }
3233
3234  private void checkScanNextCallSeq(ScanRequest request, RegionScannerHolder rsh)
3235    throws OutOfOrderScannerNextException {
3236    // if nextCallSeq does not match throw Exception straight away. This needs to be
3237    // performed even before checking of Lease.
3238    // See HBASE-5974
3239    if (request.hasNextCallSeq()) {
3240      long callSeq = request.getNextCallSeq();
3241      if (!rsh.incNextCallSeq(callSeq)) {
3242        throw new OutOfOrderScannerNextException(
3243          "Expected nextCallSeq: " + rsh.getNextCallSeq() + " But the nextCallSeq got from client: "
3244            + request.getNextCallSeq() + "; request=" + TextFormat.shortDebugString(request)
3245            + "; region=" + rsh.r.getRegionInfo().getRegionNameAsString());
3246      }
3247    }
3248  }
3249
3250  private void addScannerLeaseBack(LeaseManager.Lease lease) {
3251    try {
3252      server.getLeaseManager().addLease(lease);
3253    } catch (LeaseStillHeldException e) {
3254      // should not happen as the scanner id is unique.
3255      throw new AssertionError(e);
3256    }
3257  }
3258
3259  // visible for testing only
3260  long getTimeLimit(RpcCall rpcCall, HBaseRpcController controller,
3261    boolean allowHeartbeatMessages) {
3262    // Set the time limit to be half of the more restrictive timeout value (one of the
3263    // timeout values must be positive). In the event that both values are positive, the
3264    // more restrictive of the two is used to calculate the limit.
3265    if (allowHeartbeatMessages) {
3266      long now = EnvironmentEdgeManager.currentTime();
3267      long remainingTimeout = getRemainingRpcTimeout(rpcCall, controller, now);
3268      if (scannerLeaseTimeoutPeriod > 0 || remainingTimeout > 0) {
3269        long timeLimitDelta;
3270        if (scannerLeaseTimeoutPeriod > 0 && remainingTimeout > 0) {
3271          timeLimitDelta = Math.min(scannerLeaseTimeoutPeriod, remainingTimeout);
3272        } else {
3273          timeLimitDelta =
3274            scannerLeaseTimeoutPeriod > 0 ? scannerLeaseTimeoutPeriod : remainingTimeout;
3275        }
3276
3277        // Use half of whichever timeout value was more restrictive... But don't allow
3278        // the time limit to be less than the allowable minimum (could cause an
3279        // immediate timeout before scanning any data).
3280        timeLimitDelta = Math.max(timeLimitDelta / 2, minimumScanTimeLimitDelta);
3281        return now + timeLimitDelta;
3282      }
3283    }
3284    // Default value of timeLimit is negative to indicate no timeLimit should be
3285    // enforced.
3286    return -1L;
3287  }
3288
3289  private long getRemainingRpcTimeout(RpcCall call, HBaseRpcController controller, long now) {
3290    long timeout;
3291    if (controller != null && controller.getCallTimeout() > 0) {
3292      timeout = controller.getCallTimeout();
3293    } else if (rpcTimeout > 0) {
3294      timeout = rpcTimeout;
3295    } else {
3296      return -1;
3297    }
3298    if (call != null) {
3299      timeout -= (now - call.getReceiveTime());
3300    }
3301    // getTimeLimit ignores values <= 0, but timeout may now be negative if queue time was high.
3302    // return minimum value here in that case so we count this in calculating the final delta.
3303    return Math.max(minimumScanTimeLimitDelta, timeout);
3304  }
3305
3306  private void checkLimitOfRows(int numOfCompleteRows, int limitOfRows, boolean moreRows,
3307    ScannerContext scannerContext, ScanResponse.Builder builder) {
3308    if (numOfCompleteRows >= limitOfRows) {
3309      if (LOG.isTraceEnabled()) {
3310        LOG.trace("Done scanning, limit of rows reached, moreRows: " + moreRows
3311          + " scannerContext: " + scannerContext);
3312      }
3313      builder.setMoreResults(false);
3314    }
3315  }
3316
3317  // return whether we have more results in region.
3318  private void scan(HBaseRpcController controller, ScanRequest request, RegionScannerHolder rsh,
3319    long maxQuotaResultSize, int maxResults, int limitOfRows, List<Result> results,
3320    ScanResponse.Builder builder, RpcCall rpcCall, ServerSideScanMetrics scanMetrics)
3321    throws IOException {
3322    HRegion region = rsh.r;
3323    RegionScanner scanner = rsh.s;
3324    long maxResultSize;
3325    if (scanner.getMaxResultSize() > 0) {
3326      maxResultSize = Math.min(scanner.getMaxResultSize(), maxQuotaResultSize);
3327    } else {
3328      maxResultSize = maxQuotaResultSize;
3329    }
3330    // This is cells inside a row. Default size is 10 so if many versions or many cfs,
3331    // then we'll resize. Resizings show in profiler. Set it higher than 10. For now
3332    // arbitrary 32. TODO: keep record of general size of results being returned.
3333    ArrayList<Cell> values = new ArrayList<>(32);
3334    region.startRegionOperation(Operation.SCAN);
3335    long before = EnvironmentEdgeManager.currentTime();
3336    // Used to check if we've matched the row limit set on the Scan
3337    int numOfCompleteRows = 0;
3338    // Count of times we call nextRaw; can be > numOfCompleteRows.
3339    int numOfNextRawCalls = 0;
3340    try {
3341      int numOfResults = 0;
3342      synchronized (scanner) {
3343        boolean stale = (region.getRegionInfo().getReplicaId() != 0);
3344        boolean clientHandlesPartials =
3345          request.hasClientHandlesPartials() && request.getClientHandlesPartials();
3346        boolean clientHandlesHeartbeats =
3347          request.hasClientHandlesHeartbeats() && request.getClientHandlesHeartbeats();
3348
3349        // On the server side we must ensure that the correct ordering of partial results is
3350        // returned to the client to allow them to properly reconstruct the partial results.
3351        // If the coprocessor host is adding to the result list, we cannot guarantee the
3352        // correct ordering of partial results and so we prevent partial results from being
3353        // formed.
3354        boolean serverGuaranteesOrderOfPartials = results.isEmpty();
3355        boolean allowPartialResults = clientHandlesPartials && serverGuaranteesOrderOfPartials;
3356        boolean moreRows = false;
3357
3358        // Heartbeat messages occur when the processing of the ScanRequest is exceeds a
3359        // certain time threshold on the server. When the time threshold is exceeded, the
3360        // server stops the scan and sends back whatever Results it has accumulated within
3361        // that time period (may be empty). Since heartbeat messages have the potential to
3362        // create partial Results (in the event that the timeout occurs in the middle of a
3363        // row), we must only generate heartbeat messages when the client can handle both
3364        // heartbeats AND partials
3365        boolean allowHeartbeatMessages = clientHandlesHeartbeats && allowPartialResults;
3366
3367        long timeLimit = getTimeLimit(rpcCall, controller, allowHeartbeatMessages);
3368
3369        final LimitScope sizeScope =
3370          allowPartialResults ? LimitScope.BETWEEN_CELLS : LimitScope.BETWEEN_ROWS;
3371        final LimitScope timeScope =
3372          allowHeartbeatMessages ? LimitScope.BETWEEN_CELLS : LimitScope.BETWEEN_ROWS;
3373
3374        // Configure with limits for this RPC. Set keep progress true since size progress
3375        // towards size limit should be kept between calls to nextRaw
3376        ScannerContext.Builder contextBuilder = ScannerContext.newBuilder(true);
3377        // maxResultSize - either we can reach this much size for all cells(being read) data or sum
3378        // of heap size occupied by cells(being read). Cell data means its key and value parts.
3379        // maxQuotaResultSize - max results just from server side configuration and quotas, without
3380        // user's specified max. We use this for evaluating limits based on blocks (not cells).
3381        // We may have accumulated some results in coprocessor preScannerNext call. Subtract any
3382        // cell or block size from maximum here so we adhere to total limits of request.
3383        // Note: we track block size in StoreScanner. If the CP hook got cells from hbase, it will
3384        // have accumulated block bytes. If not, this will be 0 for block size.
3385        long maxCellSize = maxResultSize;
3386        long maxBlockSize = maxQuotaResultSize;
3387        if (rpcCall != null) {
3388          maxBlockSize -= rpcCall.getBlockBytesScanned();
3389          maxCellSize -= rpcCall.getResponseCellSize();
3390        }
3391
3392        contextBuilder.setSizeLimit(sizeScope, maxCellSize, maxCellSize, maxBlockSize);
3393        contextBuilder.setBatchLimit(scanner.getBatch());
3394        contextBuilder.setTimeLimit(timeScope, timeLimit);
3395        contextBuilder.setTrackMetrics(scanMetrics != null);
3396        contextBuilder.setScanMetrics(scanMetrics);
3397        ScannerContext scannerContext = contextBuilder.build();
3398        boolean limitReached = false;
3399        long blockBytesScannedBefore = 0;
3400        while (numOfResults < maxResults) {
3401          // Reset the batch progress to 0 before every call to RegionScanner#nextRaw. The
3402          // batch limit is a limit on the number of cells per Result. Thus, if progress is
3403          // being tracked (i.e. scannerContext.keepProgress() is true) then we need to
3404          // reset the batch progress between nextRaw invocations since we don't want the
3405          // batch progress from previous calls to affect future calls
3406          scannerContext.setBatchProgress(0);
3407          assert values.isEmpty();
3408
3409          // Collect values to be returned here
3410          moreRows = scanner.nextRaw(values, scannerContext);
3411
3412          long blockBytesScanned = scannerContext.getBlockSizeProgress() - blockBytesScannedBefore;
3413          blockBytesScannedBefore = scannerContext.getBlockSizeProgress();
3414
3415          if (rpcCall == null) {
3416            // When there is no RpcCallContext,copy EC to heap, then the scanner would close,
3417            // This can be an EXPENSIVE call. It may make an extra copy from offheap to onheap
3418            // buffers.See more details in HBASE-26036.
3419            CellUtil.cloneIfNecessary(values);
3420          }
3421          numOfNextRawCalls++;
3422
3423          if (!values.isEmpty()) {
3424            if (limitOfRows > 0) {
3425              // First we need to check if the last result is partial and we have a row change. If
3426              // so then we need to increase the numOfCompleteRows.
3427              if (results.isEmpty()) {
3428                if (
3429                  rsh.rowOfLastPartialResult != null
3430                    && !CellUtil.matchingRows(values.get(0), rsh.rowOfLastPartialResult)
3431                ) {
3432                  numOfCompleteRows++;
3433                  checkLimitOfRows(numOfCompleteRows, limitOfRows, moreRows, scannerContext,
3434                    builder);
3435                }
3436              } else {
3437                Result lastResult = results.get(results.size() - 1);
3438                if (
3439                  lastResult.mayHaveMoreCellsInRow()
3440                    && !CellUtil.matchingRows(values.get(0), lastResult.getRow())
3441                ) {
3442                  numOfCompleteRows++;
3443                  checkLimitOfRows(numOfCompleteRows, limitOfRows, moreRows, scannerContext,
3444                    builder);
3445                }
3446              }
3447              if (builder.hasMoreResults() && !builder.getMoreResults()) {
3448                break;
3449              }
3450            }
3451            boolean mayHaveMoreCellsInRow = scannerContext.mayHaveMoreCellsInRow();
3452            Result r = Result.create(values, null, stale, mayHaveMoreCellsInRow);
3453
3454            if (request.getScan().getQueryMetricsEnabled()) {
3455              builder.addQueryMetrics(ClientProtos.QueryMetrics.newBuilder()
3456                .setBlockBytesScanned(blockBytesScanned).build());
3457            }
3458
3459            results.add(r);
3460            numOfResults++;
3461            if (!mayHaveMoreCellsInRow && limitOfRows > 0) {
3462              numOfCompleteRows++;
3463              checkLimitOfRows(numOfCompleteRows, limitOfRows, moreRows, scannerContext, builder);
3464              if (builder.hasMoreResults() && !builder.getMoreResults()) {
3465                break;
3466              }
3467            }
3468          } else if (!moreRows && !results.isEmpty()) {
3469            // No more cells for the scan here, we need to ensure that the mayHaveMoreCellsInRow of
3470            // last result is false. Otherwise it's possible that: the first nextRaw returned
3471            // because BATCH_LIMIT_REACHED (BTW it happen to exhaust all cells of the scan),so the
3472            // last result's mayHaveMoreCellsInRow will be true. while the following nextRaw will
3473            // return with moreRows=false, which means moreResultsInRegion would be false, it will
3474            // be a contradictory state (HBASE-21206).
3475            int lastIdx = results.size() - 1;
3476            Result r = results.get(lastIdx);
3477            if (r.mayHaveMoreCellsInRow()) {
3478              results.set(lastIdx, ClientInternalHelper.createResult(
3479                ClientInternalHelper.getExtendedRawCells(r), r.getExists(), r.isStale(), false));
3480            }
3481          }
3482          boolean sizeLimitReached = scannerContext.checkSizeLimit(LimitScope.BETWEEN_ROWS);
3483          boolean timeLimitReached = scannerContext.checkTimeLimit(LimitScope.BETWEEN_ROWS);
3484          boolean resultsLimitReached = numOfResults >= maxResults;
3485          limitReached = sizeLimitReached || timeLimitReached || resultsLimitReached;
3486
3487          if (limitReached || !moreRows) {
3488            // With block size limit, we may exceed size limit without collecting any results.
3489            // In this case we want to send heartbeat and/or cursor. We don't want to send heartbeat
3490            // or cursor if results were collected, for example for cell size or heap size limits.
3491            boolean sizeLimitReachedWithoutResults = sizeLimitReached && results.isEmpty();
3492            // We only want to mark a ScanResponse as a heartbeat message in the event that
3493            // there are more values to be read server side. If there aren't more values,
3494            // marking it as a heartbeat is wasteful because the client will need to issue
3495            // another ScanRequest only to realize that they already have all the values
3496            if (moreRows && (timeLimitReached || sizeLimitReachedWithoutResults)) {
3497              // Heartbeat messages occur when the time limit has been reached, or size limit has
3498              // been reached before collecting any results. This can happen for heavily filtered
3499              // scans which scan over too many blocks.
3500              builder.setHeartbeatMessage(true);
3501              if (rsh.needCursor) {
3502                Cell cursorCell = scannerContext.getLastPeekedCell();
3503                if (cursorCell != null) {
3504                  builder.setCursor(ProtobufUtil.toCursor(cursorCell));
3505                }
3506              }
3507            }
3508            break;
3509          }
3510          values.clear();
3511        }
3512        if (rpcCall != null) {
3513          rpcCall.incrementResponseCellSize(scannerContext.getHeapSizeProgress());
3514        }
3515        builder.setMoreResultsInRegion(moreRows);
3516        // Check to see if the client requested that we track metrics server side. If the
3517        // client requested metrics, retrieve the metrics from the scanner context.
3518        if (scanMetrics != null) {
3519          // rather than increment yet another counter in StoreScanner, just set the value here
3520          // from block size progress before writing into the response
3521          scanMetrics.setCounter(ServerSideScanMetrics.BLOCK_BYTES_SCANNED_KEY_METRIC_NAME,
3522            scannerContext.getBlockSizeProgress());
3523        }
3524      }
3525    } finally {
3526      region.closeRegionOperation();
3527      // Update serverside metrics, even on error.
3528      long end = EnvironmentEdgeManager.currentTime();
3529
3530      long responseCellSize = 0;
3531      long blockBytesScanned = 0;
3532      if (rpcCall != null) {
3533        responseCellSize = rpcCall.getResponseCellSize();
3534        blockBytesScanned = rpcCall.getBlockBytesScanned();
3535        rsh.updateBlockBytesScanned(blockBytesScanned);
3536      }
3537      region.getMetrics().updateScan();
3538      final MetricsRegionServer metricsRegionServer = server.getMetrics();
3539      if (metricsRegionServer != null) {
3540        metricsRegionServer.updateScan(region, end - before, responseCellSize, blockBytesScanned);
3541        metricsRegionServer.updateReadQueryMeter(region, numOfNextRawCalls);
3542      }
3543    }
3544    // coprocessor postNext hook
3545    if (region.getCoprocessorHost() != null) {
3546      region.getCoprocessorHost().postScannerNext(scanner, results, maxResults, true);
3547    }
3548  }
3549
3550  /**
3551   * Scan data in a table.
3552   * @param controller the RPC controller
3553   * @param request    the scan request
3554   */
3555  @Override
3556  public ScanResponse scan(final RpcController controller, final ScanRequest request)
3557    throws ServiceException {
3558    if (controller != null && !(controller instanceof HBaseRpcController)) {
3559      throw new UnsupportedOperationException(
3560        "We only do " + "HBaseRpcControllers! FIX IF A PROBLEM: " + controller);
3561    }
3562    if (!request.hasScannerId() && !request.hasScan()) {
3563      throw new ServiceException(
3564        new DoNotRetryIOException("Missing required input: scannerId or scan"));
3565    }
3566    try {
3567      checkOpen();
3568    } catch (IOException e) {
3569      if (request.hasScannerId()) {
3570        String scannerName = toScannerName(request.getScannerId());
3571        if (LOG.isDebugEnabled()) {
3572          LOG.debug(
3573            "Server shutting down and client tried to access missing scanner " + scannerName);
3574        }
3575        final LeaseManager leaseManager = server.getLeaseManager();
3576        if (leaseManager != null) {
3577          try {
3578            leaseManager.cancelLease(scannerName);
3579          } catch (LeaseException le) {
3580            // No problem, ignore
3581            if (LOG.isTraceEnabled()) {
3582              LOG.trace("Un-able to cancel lease of scanner. It could already be closed.");
3583            }
3584          }
3585        }
3586      }
3587      throw new ServiceException(e);
3588    }
3589    boolean trackMetrics = request.hasTrackScanMetrics() && request.getTrackScanMetrics();
3590    ThreadLocalServerSideScanMetrics.setScanMetricsEnabled(trackMetrics);
3591    if (trackMetrics) {
3592      ThreadLocalServerSideScanMetrics.reset();
3593    }
3594    requestCount.increment();
3595    rpcScanRequestCount.increment();
3596    RegionScannerContext rsx;
3597    ScanResponse.Builder builder = ScanResponse.newBuilder();
3598    try {
3599      rsx = checkQuotaAndGetRegionScannerContext(request, builder);
3600    } catch (IOException e) {
3601      if (e == SCANNER_ALREADY_CLOSED) {
3602        // Now we will close scanner automatically if there are no more results for this region but
3603        // the old client will still send a close request to us. Just ignore it and return.
3604        return builder.build();
3605      }
3606      throw new ServiceException(e);
3607    }
3608    String scannerName = rsx.scannerName;
3609    RegionScannerHolder rsh = rsx.holder;
3610    OperationQuota quota = rsx.quota;
3611    if (rsh.fullRegionScan) {
3612      rpcFullScanRequestCount.increment();
3613    }
3614    HRegion region = rsh.r;
3615    LeaseManager.Lease lease;
3616    try {
3617      // Remove lease while its being processed in server; protects against case
3618      // where processing of request takes > lease expiration time. or null if none found.
3619      lease = server.getLeaseManager().removeLease(scannerName);
3620    } catch (LeaseException e) {
3621      throw new ServiceException(e);
3622    }
3623    if (request.hasRenew() && request.getRenew()) {
3624      // add back and return
3625      addScannerLeaseBack(lease);
3626      try {
3627        checkScanNextCallSeq(request, rsh);
3628      } catch (OutOfOrderScannerNextException e) {
3629        throw new ServiceException(e);
3630      }
3631      return builder.build();
3632    }
3633    try {
3634      checkScanNextCallSeq(request, rsh);
3635    } catch (OutOfOrderScannerNextException e) {
3636      addScannerLeaseBack(lease);
3637      throw new ServiceException(e);
3638    }
3639    // Now we have increased the next call sequence. If we give client an error, the retry will
3640    // never success. So we'd better close the scanner and return a DoNotRetryIOException to client
3641    // and then client will try to open a new scanner.
3642    boolean closeScanner = request.hasCloseScanner() ? request.getCloseScanner() : false;
3643    int rows; // this is scan.getCaching
3644    if (request.hasNumberOfRows()) {
3645      rows = request.getNumberOfRows();
3646    } else {
3647      rows = closeScanner ? 0 : 1;
3648    }
3649    RpcCall rpcCall = RpcServer.getCurrentCall().orElse(null);
3650    // now let's do the real scan.
3651    long maxQuotaResultSize = Math.min(maxScannerResultSize, quota.getMaxResultSize());
3652    RegionScanner scanner = rsh.s;
3653    // this is the limit of rows for this scan, if we the number of rows reach this value, we will
3654    // close the scanner.
3655    int limitOfRows;
3656    if (request.hasLimitOfRows()) {
3657      limitOfRows = request.getLimitOfRows();
3658    } else {
3659      limitOfRows = -1;
3660    }
3661    boolean scannerClosed = false;
3662    try {
3663      List<Result> results = new ArrayList<>(Math.min(rows, 512));
3664      ServerSideScanMetrics scanMetrics = trackMetrics ? new ServerSideScanMetrics() : null;
3665      if (rows > 0) {
3666        boolean done = false;
3667        // Call coprocessor. Get region info from scanner.
3668        if (region.getCoprocessorHost() != null) {
3669          Boolean bypass = region.getCoprocessorHost().preScannerNext(scanner, results, rows);
3670          if (!results.isEmpty()) {
3671            for (Result r : results) {
3672              // add cell size from CP results so we can track response size and update limits
3673              // when calling scan below if !done. We'll also have tracked block size if the CP
3674              // got results from hbase, since StoreScanner tracks that for all calls automatically.
3675              addSize(rpcCall, r);
3676            }
3677          }
3678          if (bypass != null && bypass.booleanValue()) {
3679            done = true;
3680          }
3681        }
3682        if (!done) {
3683          scan((HBaseRpcController) controller, request, rsh, maxQuotaResultSize, rows, limitOfRows,
3684            results, builder, rpcCall, scanMetrics);
3685        } else {
3686          builder.setMoreResultsInRegion(!results.isEmpty());
3687        }
3688      } else {
3689        // This is a open scanner call with numberOfRow = 0, so set more results in region to true.
3690        builder.setMoreResultsInRegion(true);
3691      }
3692
3693      quota.addScanResult(results);
3694      addResults(builder, results, (HBaseRpcController) controller,
3695        RegionReplicaUtil.isDefaultReplica(region.getRegionInfo()),
3696        isClientCellBlockSupport(rpcCall));
3697      if (scanner.isFilterDone() && results.isEmpty()) {
3698        // If the scanner's filter - if any - is done with the scan
3699        // only set moreResults to false if the results is empty. This is used to keep compatible
3700        // with the old scan implementation where we just ignore the returned results if moreResults
3701        // is false. Can remove the isEmpty check after we get rid of the old implementation.
3702        builder.setMoreResults(false);
3703      }
3704      // Later we may close the scanner depending on this flag so here we need to make sure that we
3705      // have already set this flag.
3706      assert builder.hasMoreResultsInRegion();
3707      // we only set moreResults to false in the above code, so set it to true if we haven't set it
3708      // yet.
3709      if (!builder.hasMoreResults()) {
3710        builder.setMoreResults(true);
3711      }
3712      if (builder.getMoreResults() && builder.getMoreResultsInRegion() && !results.isEmpty()) {
3713        // Record the last cell of the last result if it is a partial result
3714        // We need this to calculate the complete rows we have returned to client as the
3715        // mayHaveMoreCellsInRow is true does not mean that there will be extra cells for the
3716        // current row. We may filter out all the remaining cells for the current row and just
3717        // return the cells of the nextRow when calling RegionScanner.nextRaw. So here we need to
3718        // check for row change.
3719        Result lastResult = results.get(results.size() - 1);
3720        if (lastResult.mayHaveMoreCellsInRow()) {
3721          rsh.rowOfLastPartialResult = lastResult.getRow();
3722        } else {
3723          rsh.rowOfLastPartialResult = null;
3724        }
3725      }
3726      if (!builder.getMoreResults() || !builder.getMoreResultsInRegion() || closeScanner) {
3727        scannerClosed = true;
3728        closeScanner(region, scanner, scannerName, rpcCall, false);
3729      }
3730
3731      // There's no point returning to a timed out client. Throwing ensures scanner is closed
3732      if (rpcCall != null && EnvironmentEdgeManager.currentTime() > rpcCall.getDeadline()) {
3733        throw new TimeoutIOException("Client deadline exceeded, cannot return results");
3734      }
3735
3736      if (scanMetrics != null) {
3737        if (rpcCall != null) {
3738          long rpcScanTime = EnvironmentEdgeManager.currentTime() - rpcCall.getStartTime();
3739          long rpcQueueWaitTime = rpcCall.getStartTime() - rpcCall.getReceiveTime();
3740          scanMetrics.addToCounter(ServerSideScanMetrics.RPC_SCAN_PROCESSING_TIME_METRIC_NAME,
3741            rpcScanTime);
3742          scanMetrics.addToCounter(ServerSideScanMetrics.RPC_SCAN_QUEUE_WAIT_TIME_METRIC_NAME,
3743            rpcQueueWaitTime);
3744        }
3745        ThreadLocalServerSideScanMetrics.populateServerSideScanMetrics(scanMetrics);
3746        Map<String, Long> metrics = scanMetrics.getMetricsMap();
3747        ScanMetrics.Builder metricBuilder = ScanMetrics.newBuilder();
3748        NameInt64Pair.Builder pairBuilder = NameInt64Pair.newBuilder();
3749
3750        for (Entry<String, Long> entry : metrics.entrySet()) {
3751          pairBuilder.setName(entry.getKey());
3752          pairBuilder.setValue(entry.getValue());
3753          metricBuilder.addMetrics(pairBuilder.build());
3754        }
3755
3756        builder.setScanMetrics(metricBuilder.build());
3757      }
3758
3759      return builder.build();
3760    } catch (IOException e) {
3761      try {
3762        // scanner is closed here
3763        scannerClosed = true;
3764        // The scanner state might be left in a dirty state, so we will tell the Client to
3765        // fail this RPC and close the scanner while opening up another one from the start of
3766        // row that the client has last seen.
3767        closeScanner(region, scanner, scannerName, rpcCall, true);
3768
3769        // If it is a DoNotRetryIOException already, throw as it is. Unfortunately, DNRIOE is
3770        // used in two different semantics.
3771        // (1) The first is to close the client scanner and bubble up the exception all the way
3772        // to the application. This is preferred when the exception is really un-recoverable
3773        // (like CorruptHFileException, etc). Plain DoNotRetryIOException also falls into this
3774        // bucket usually.
3775        // (2) Second semantics is to close the current region scanner only, but continue the
3776        // client scanner by overriding the exception. This is usually UnknownScannerException,
3777        // OutOfOrderScannerNextException, etc where the region scanner has to be closed, but the
3778        // application-level ClientScanner has to continue without bubbling up the exception to
3779        // the client. See ClientScanner code to see how it deals with these special exceptions.
3780        if (e instanceof DoNotRetryIOException) {
3781          throw e;
3782        }
3783
3784        // If it is a FileNotFoundException, wrap as a
3785        // DoNotRetryIOException. This can avoid the retry in ClientScanner.
3786        if (e instanceof FileNotFoundException) {
3787          throw new DoNotRetryIOException(e);
3788        }
3789
3790        // We closed the scanner already. Instead of throwing the IOException, and client
3791        // retrying with the same scannerId only to get USE on the next RPC, we directly throw
3792        // a special exception to save an RPC.
3793        if (VersionInfoUtil.hasMinimumVersion(rpcCall.getClientVersionInfo(), 1, 4)) {
3794          // 1.4.0+ clients know how to handle
3795          throw new ScannerResetException("Scanner is closed on the server-side", e);
3796        } else {
3797          // older clients do not know about SRE. Just throw USE, which they will handle
3798          throw new UnknownScannerException("Throwing UnknownScannerException to reset the client"
3799            + " scanner state for clients older than 1.3.", e);
3800        }
3801      } catch (IOException ioe) {
3802        throw new ServiceException(ioe);
3803      }
3804    } finally {
3805      if (!scannerClosed) {
3806        // Adding resets expiration time on lease.
3807        // the closeCallBack will be set in closeScanner so here we only care about shippedCallback
3808        if (rpcCall != null) {
3809          rpcCall.setCallBack(rsh.shippedCallback);
3810        } else {
3811          // If context is null,here we call rsh.shippedCallback directly to reuse the logic in
3812          // rsh.shippedCallback to release the internal resources in rsh,and lease is also added
3813          // back to regionserver's LeaseManager in rsh.shippedCallback.
3814          runShippedCallback(rsh);
3815        }
3816      }
3817      quota.close();
3818    }
3819  }
3820
3821  private void runShippedCallback(RegionScannerHolder rsh) throws ServiceException {
3822    assert rsh.shippedCallback != null;
3823    try {
3824      rsh.shippedCallback.run();
3825    } catch (IOException ioe) {
3826      throw new ServiceException(ioe);
3827    }
3828  }
3829
3830  private void closeScanner(HRegion region, RegionScanner scanner, String scannerName,
3831    RpcCallContext context, boolean isError) throws IOException {
3832    if (region.getCoprocessorHost() != null) {
3833      if (region.getCoprocessorHost().preScannerClose(scanner)) {
3834        // bypass the actual close.
3835        return;
3836      }
3837    }
3838    RegionScannerHolder rsh = scanners.remove(scannerName);
3839    if (rsh != null) {
3840      if (context != null) {
3841        context.setCallBack(rsh.closeCallBack);
3842      } else {
3843        rsh.s.close();
3844      }
3845      if (region.getCoprocessorHost() != null) {
3846        region.getCoprocessorHost().postScannerClose(scanner);
3847      }
3848      if (!isError) {
3849        closedScanners.put(scannerName, rsh.getNextCallSeq());
3850      }
3851    }
3852  }
3853
3854  @Override
3855  public CoprocessorServiceResponse execRegionServerService(RpcController controller,
3856    CoprocessorServiceRequest request) throws ServiceException {
3857    rpcPreCheck("execRegionServerService");
3858    return server.execRegionServerService(controller, request);
3859  }
3860
3861  @Override
3862  public GetSpaceQuotaSnapshotsResponse getSpaceQuotaSnapshots(RpcController controller,
3863    GetSpaceQuotaSnapshotsRequest request) throws ServiceException {
3864    try {
3865      final RegionServerSpaceQuotaManager manager = server.getRegionServerSpaceQuotaManager();
3866      final GetSpaceQuotaSnapshotsResponse.Builder builder =
3867        GetSpaceQuotaSnapshotsResponse.newBuilder();
3868      if (manager != null) {
3869        final Map<TableName, SpaceQuotaSnapshot> snapshots = manager.copyQuotaSnapshots();
3870        for (Entry<TableName, SpaceQuotaSnapshot> snapshot : snapshots.entrySet()) {
3871          builder.addSnapshots(TableQuotaSnapshot.newBuilder()
3872            .setTableName(ProtobufUtil.toProtoTableName(snapshot.getKey()))
3873            .setSnapshot(SpaceQuotaSnapshot.toProtoSnapshot(snapshot.getValue())).build());
3874        }
3875      }
3876      return builder.build();
3877    } catch (Exception e) {
3878      throw new ServiceException(e);
3879    }
3880  }
3881
3882  @Override
3883  public ClearRegionBlockCacheResponse clearRegionBlockCache(RpcController controller,
3884    ClearRegionBlockCacheRequest request) throws ServiceException {
3885    try {
3886      rpcPreCheck("clearRegionBlockCache");
3887      ClearRegionBlockCacheResponse.Builder builder = ClearRegionBlockCacheResponse.newBuilder();
3888      CacheEvictionStatsBuilder stats = CacheEvictionStats.builder();
3889      server.getRegionServerCoprocessorHost().preClearRegionBlockCache();
3890      List<HRegion> regions = getRegions(request.getRegionList(), stats);
3891      for (HRegion region : regions) {
3892        try {
3893          stats = stats.append(this.server.clearRegionBlockCache(region));
3894        } catch (Exception e) {
3895          stats.addException(region.getRegionInfo().getRegionName(), e);
3896        }
3897      }
3898      stats.withMaxCacheSize(server.getBlockCache().map(BlockCache::getMaxSize).orElse(0L));
3899      server.getRegionServerCoprocessorHost().postClearRegionBlockCache(stats.build());
3900      return builder.setStats(ProtobufUtil.toCacheEvictionStats(stats.build())).build();
3901    } catch (IOException e) {
3902      throw new ServiceException(e);
3903    }
3904  }
3905
3906  private void executeOpenRegionProcedures(OpenRegionRequest request,
3907    Map<TableName, TableDescriptor> tdCache) {
3908    long masterSystemTime = request.hasMasterSystemTime() ? request.getMasterSystemTime() : -1;
3909    long initiatingMasterActiveTime =
3910      request.hasInitiatingMasterActiveTime() ? request.getInitiatingMasterActiveTime() : -1;
3911    for (RegionOpenInfo regionOpenInfo : request.getOpenInfoList()) {
3912      RegionInfo regionInfo = ProtobufUtil.toRegionInfo(regionOpenInfo.getRegion());
3913      TableName tableName = regionInfo.getTable();
3914      TableDescriptor tableDesc = tdCache.get(tableName);
3915      if (tableDesc == null) {
3916        try {
3917          tableDesc = server.getTableDescriptors().get(regionInfo.getTable());
3918        } catch (IOException e) {
3919          // Here we do not fail the whole method since we also need deal with other
3920          // procedures, and we can not ignore this one, so we still schedule a
3921          // AssignRegionHandler and it will report back to master if we still can not get the
3922          // TableDescriptor.
3923          LOG.warn("Failed to get TableDescriptor of {}, will try again in the handler",
3924            regionInfo.getTable(), e);
3925        }
3926        if (tableDesc != null) {
3927          tdCache.put(tableName, tableDesc);
3928        }
3929      }
3930      if (regionOpenInfo.getFavoredNodesCount() > 0) {
3931        server.updateRegionFavoredNodesMapping(regionInfo.getEncodedName(),
3932          regionOpenInfo.getFavoredNodesList());
3933      }
3934      long procId = regionOpenInfo.getOpenProcId();
3935      if (server.submitRegionProcedure(procId)) {
3936        server.getExecutorService().submit(AssignRegionHandler.create(server, regionInfo, procId,
3937          tableDesc, masterSystemTime, initiatingMasterActiveTime));
3938      }
3939    }
3940  }
3941
3942  private void executeCloseRegionProcedures(CloseRegionRequest request) {
3943    String encodedName;
3944    long initiatingMasterActiveTime =
3945      request.hasInitiatingMasterActiveTime() ? request.getInitiatingMasterActiveTime() : -1;
3946    try {
3947      encodedName = ProtobufUtil.getRegionEncodedName(request.getRegion());
3948    } catch (DoNotRetryIOException e) {
3949      throw new UncheckedIOException("Should not happen", e);
3950    }
3951    ServerName destination = request.hasDestinationServer()
3952      ? ProtobufUtil.toServerName(request.getDestinationServer())
3953      : null;
3954    long procId = request.getCloseProcId();
3955    boolean evictCache = request.getEvictCache();
3956    if (server.submitRegionProcedure(procId)) {
3957      server.getExecutorService().submit(UnassignRegionHandler.create(server, encodedName, procId,
3958        false, destination, evictCache, initiatingMasterActiveTime));
3959    }
3960  }
3961
3962  private void executeProcedures(RemoteProcedureRequest request) {
3963    RSProcedureCallable callable;
3964    try {
3965      callable = Class.forName(request.getProcClass()).asSubclass(RSProcedureCallable.class)
3966        .getDeclaredConstructor().newInstance();
3967    } catch (Exception e) {
3968      LOG.warn("Failed to instantiating remote procedure {}, pid={}", request.getProcClass(),
3969        request.getProcId(), e);
3970      server.remoteProcedureComplete(request.getProcId(), request.getInitiatingMasterActiveTime(),
3971        e, null);
3972      return;
3973    }
3974    callable.init(request.getProcData().toByteArray(), server);
3975    LOG.debug("Executing remote procedure {}, pid={}", callable.getClass(), request.getProcId());
3976    server.executeProcedure(request.getProcId(), request.getInitiatingMasterActiveTime(), callable);
3977  }
3978
3979  @Override
3980  @QosPriority(priority = HConstants.ADMIN_QOS)
3981  public ExecuteProceduresResponse executeProcedures(RpcController controller,
3982    ExecuteProceduresRequest request) throws ServiceException {
3983    try {
3984      checkOpen();
3985      throwOnWrongStartCode(request);
3986      server.getRegionServerCoprocessorHost().preExecuteProcedures();
3987      if (request.getOpenRegionCount() > 0) {
3988        // Avoid reading from the TableDescritor every time(usually it will read from the file
3989        // system)
3990        Map<TableName, TableDescriptor> tdCache = new HashMap<>();
3991        request.getOpenRegionList().forEach(req -> executeOpenRegionProcedures(req, tdCache));
3992      }
3993      if (request.getCloseRegionCount() > 0) {
3994        request.getCloseRegionList().forEach(this::executeCloseRegionProcedures);
3995      }
3996      if (request.getProcCount() > 0) {
3997        request.getProcList().forEach(this::executeProcedures);
3998      }
3999      server.getRegionServerCoprocessorHost().postExecuteProcedures();
4000      return ExecuteProceduresResponse.getDefaultInstance();
4001    } catch (IOException e) {
4002      throw new ServiceException(e);
4003    }
4004  }
4005
4006  @Override
4007  public GetAllBootstrapNodesResponse getAllBootstrapNodes(RpcController controller,
4008    GetAllBootstrapNodesRequest request) throws ServiceException {
4009    GetAllBootstrapNodesResponse.Builder builder = GetAllBootstrapNodesResponse.newBuilder();
4010    server.getBootstrapNodes()
4011      .forEachRemaining(server -> builder.addNode(ProtobufUtil.toServerName(server)));
4012    return builder.build();
4013  }
4014
4015  private void setReloadableGuardrails(Configuration conf) {
4016    rowSizeWarnThreshold =
4017      conf.getInt(HConstants.BATCH_ROWS_THRESHOLD_NAME, HConstants.BATCH_ROWS_THRESHOLD_DEFAULT);
4018    rejectRowsWithSizeOverThreshold =
4019      conf.getBoolean(REJECT_BATCH_ROWS_OVER_THRESHOLD, DEFAULT_REJECT_BATCH_ROWS_OVER_THRESHOLD);
4020    maxScannerResultSize = conf.getLong(HConstants.HBASE_SERVER_SCANNER_MAX_RESULT_SIZE_KEY,
4021      HConstants.DEFAULT_HBASE_SERVER_SCANNER_MAX_RESULT_SIZE);
4022  }
4023
4024  @Override
4025  public void onConfigurationChange(Configuration conf) {
4026    super.onConfigurationChange(conf);
4027    setReloadableGuardrails(conf);
4028  }
4029
4030  @Override
4031  public GetCachedFilesListResponse getCachedFilesList(RpcController controller,
4032    GetCachedFilesListRequest request) throws ServiceException {
4033    GetCachedFilesListResponse.Builder responseBuilder = GetCachedFilesListResponse.newBuilder();
4034    List<String> fullyCachedFiles = new ArrayList<>();
4035    server.getBlockCache().flatMap(BlockCache::getFullyCachedFiles).ifPresent(fcf -> {
4036      fullyCachedFiles.addAll(fcf.keySet());
4037    });
4038    return responseBuilder.addAllCachedFiles(fullyCachedFiles).build();
4039  }
4040
4041  RegionScannerContext checkQuotaAndGetRegionScannerContext(ScanRequest request,
4042    ScanResponse.Builder builder) throws IOException {
4043    if (request.hasScannerId()) {
4044      // The downstream projects such as AsyncHBase in OpenTSDB need this value. See HBASE-18000
4045      // for more details.
4046      long scannerId = request.getScannerId();
4047      builder.setScannerId(scannerId);
4048      String scannerName = toScannerName(scannerId);
4049      RegionScannerHolder rsh = getRegionScanner(request);
4050      OperationQuota quota =
4051        getRpcQuotaManager().checkScanQuota(rsh.r, request, maxScannerResultSize,
4052          rsh.getMaxBlockBytesScanned(), rsh.getPrevBlockBytesScannedDifference());
4053      return new RegionScannerContext(scannerName, rsh, quota);
4054    }
4055
4056    HRegion region = getRegion(request.getRegion());
4057    OperationQuota quota =
4058      getRpcQuotaManager().checkScanQuota(region, request, maxScannerResultSize, 0L, 0L);
4059    Pair<String, RegionScannerHolder> pair = newRegionScanner(request, region, builder);
4060    return new RegionScannerContext(pair.getFirst(), pair.getSecond(), quota);
4061  }
4062}