001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import com.google.errorprone.annotations.RestrictedApi;
021import java.io.FileNotFoundException;
022import java.io.IOException;
023import java.io.UncheckedIOException;
024import java.net.BindException;
025import java.net.InetAddress;
026import java.net.InetSocketAddress;
027import java.util.ArrayList;
028import java.util.Arrays;
029import java.util.Collections;
030import java.util.HashMap;
031import java.util.Iterator;
032import java.util.List;
033import java.util.Map;
034import java.util.Map.Entry;
035import java.util.NavigableMap;
036import java.util.Set;
037import java.util.TreeSet;
038import java.util.concurrent.ConcurrentHashMap;
039import java.util.concurrent.ConcurrentMap;
040import java.util.concurrent.TimeUnit;
041import java.util.concurrent.atomic.AtomicBoolean;
042import java.util.concurrent.atomic.AtomicLong;
043import java.util.concurrent.atomic.LongAdder;
044import org.apache.hadoop.conf.Configuration;
045import org.apache.hadoop.fs.FileSystem;
046import org.apache.hadoop.fs.Path;
047import org.apache.hadoop.hbase.CacheEvictionStats;
048import org.apache.hadoop.hbase.CacheEvictionStatsBuilder;
049import org.apache.hadoop.hbase.Cell;
050import org.apache.hadoop.hbase.CellScanner;
051import org.apache.hadoop.hbase.CellUtil;
052import org.apache.hadoop.hbase.DoNotRetryIOException;
053import org.apache.hadoop.hbase.DroppedSnapshotException;
054import org.apache.hadoop.hbase.ExtendedCellScannable;
055import org.apache.hadoop.hbase.ExtendedCellScanner;
056import org.apache.hadoop.hbase.HBaseIOException;
057import org.apache.hadoop.hbase.HBaseRpcServicesBase;
058import org.apache.hadoop.hbase.HConstants;
059import org.apache.hadoop.hbase.MultiActionResultTooLarge;
060import org.apache.hadoop.hbase.NotServingRegionException;
061import org.apache.hadoop.hbase.PrivateCellUtil;
062import org.apache.hadoop.hbase.RegionTooBusyException;
063import org.apache.hadoop.hbase.Server;
064import org.apache.hadoop.hbase.ServerName;
065import org.apache.hadoop.hbase.TableName;
066import org.apache.hadoop.hbase.UnknownScannerException;
067import org.apache.hadoop.hbase.client.Append;
068import org.apache.hadoop.hbase.client.CheckAndMutate;
069import org.apache.hadoop.hbase.client.CheckAndMutateResult;
070import org.apache.hadoop.hbase.client.Delete;
071import org.apache.hadoop.hbase.client.Durability;
072import org.apache.hadoop.hbase.client.Get;
073import org.apache.hadoop.hbase.client.Increment;
074import org.apache.hadoop.hbase.client.Mutation;
075import org.apache.hadoop.hbase.client.OperationWithAttributes;
076import org.apache.hadoop.hbase.client.PackagePrivateFieldAccessor;
077import org.apache.hadoop.hbase.client.Put;
078import org.apache.hadoop.hbase.client.RegionInfo;
079import org.apache.hadoop.hbase.client.RegionReplicaUtil;
080import org.apache.hadoop.hbase.client.Result;
081import org.apache.hadoop.hbase.client.Row;
082import org.apache.hadoop.hbase.client.Scan;
083import org.apache.hadoop.hbase.client.TableDescriptor;
084import org.apache.hadoop.hbase.client.VersionInfoUtil;
085import org.apache.hadoop.hbase.exceptions.FailedSanityCheckException;
086import org.apache.hadoop.hbase.exceptions.OutOfOrderScannerNextException;
087import org.apache.hadoop.hbase.exceptions.ScannerResetException;
088import org.apache.hadoop.hbase.exceptions.TimeoutIOException;
089import org.apache.hadoop.hbase.exceptions.UnknownProtocolException;
090import org.apache.hadoop.hbase.io.ByteBuffAllocator;
091import org.apache.hadoop.hbase.io.hfile.BlockCache;
092import org.apache.hadoop.hbase.ipc.HBaseRpcController;
093import org.apache.hadoop.hbase.ipc.PriorityFunction;
094import org.apache.hadoop.hbase.ipc.QosPriority;
095import org.apache.hadoop.hbase.ipc.RpcCall;
096import org.apache.hadoop.hbase.ipc.RpcCallContext;
097import org.apache.hadoop.hbase.ipc.RpcCallback;
098import org.apache.hadoop.hbase.ipc.RpcServer;
099import org.apache.hadoop.hbase.ipc.RpcServer.BlockingServiceAndInterface;
100import org.apache.hadoop.hbase.ipc.RpcServerFactory;
101import org.apache.hadoop.hbase.ipc.RpcServerInterface;
102import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
103import org.apache.hadoop.hbase.ipc.ServerRpcController;
104import org.apache.hadoop.hbase.net.Address;
105import org.apache.hadoop.hbase.procedure2.RSProcedureCallable;
106import org.apache.hadoop.hbase.quotas.ActivePolicyEnforcement;
107import org.apache.hadoop.hbase.quotas.OperationQuota;
108import org.apache.hadoop.hbase.quotas.QuotaUtil;
109import org.apache.hadoop.hbase.quotas.RegionServerRpcQuotaManager;
110import org.apache.hadoop.hbase.quotas.RegionServerSpaceQuotaManager;
111import org.apache.hadoop.hbase.quotas.SpaceQuotaSnapshot;
112import org.apache.hadoop.hbase.quotas.SpaceViolationPolicyEnforcement;
113import org.apache.hadoop.hbase.regionserver.LeaseManager.Lease;
114import org.apache.hadoop.hbase.regionserver.LeaseManager.LeaseStillHeldException;
115import org.apache.hadoop.hbase.regionserver.Region.Operation;
116import org.apache.hadoop.hbase.regionserver.ScannerContext.LimitScope;
117import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
118import org.apache.hadoop.hbase.regionserver.handler.AssignRegionHandler;
119import org.apache.hadoop.hbase.regionserver.handler.OpenMetaHandler;
120import org.apache.hadoop.hbase.regionserver.handler.OpenPriorityRegionHandler;
121import org.apache.hadoop.hbase.regionserver.handler.OpenRegionHandler;
122import org.apache.hadoop.hbase.regionserver.handler.UnassignRegionHandler;
123import org.apache.hadoop.hbase.replication.ReplicationUtils;
124import org.apache.hadoop.hbase.replication.regionserver.RejectReplicationRequestStateChecker;
125import org.apache.hadoop.hbase.replication.regionserver.RejectRequestsFromClientStateChecker;
126import org.apache.hadoop.hbase.security.Superusers;
127import org.apache.hadoop.hbase.security.access.Permission;
128import org.apache.hadoop.hbase.util.Bytes;
129import org.apache.hadoop.hbase.util.DNS;
130import org.apache.hadoop.hbase.util.DNS.ServerType;
131import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
132import org.apache.hadoop.hbase.util.Pair;
133import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil;
134import org.apache.hadoop.hbase.wal.WAL;
135import org.apache.hadoop.hbase.wal.WALEdit;
136import org.apache.hadoop.hbase.wal.WALKey;
137import org.apache.hadoop.hbase.wal.WALSplitUtil;
138import org.apache.hadoop.hbase.wal.WALSplitUtil.MutationReplay;
139import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
140import org.apache.yetus.audience.InterfaceAudience;
141import org.slf4j.Logger;
142import org.slf4j.LoggerFactory;
143
144import org.apache.hbase.thirdparty.com.google.common.cache.Cache;
145import org.apache.hbase.thirdparty.com.google.common.cache.CacheBuilder;
146import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList;
147import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
148import org.apache.hbase.thirdparty.com.google.protobuf.ByteString;
149import org.apache.hbase.thirdparty.com.google.protobuf.Message;
150import org.apache.hbase.thirdparty.com.google.protobuf.RpcController;
151import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
152import org.apache.hbase.thirdparty.com.google.protobuf.TextFormat;
153import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations;
154import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils;
155
156import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
157import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
158import org.apache.hadoop.hbase.shaded.protobuf.ResponseConverter;
159import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService;
160import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearCompactionQueuesRequest;
161import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearCompactionQueuesResponse;
162import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearRegionBlockCacheRequest;
163import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearRegionBlockCacheResponse;
164import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CloseRegionRequest;
165import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CloseRegionResponse;
166import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactRegionRequest;
167import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactRegionResponse;
168import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactionSwitchRequest;
169import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactionSwitchResponse;
170import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ExecuteProceduresRequest;
171import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ExecuteProceduresResponse;
172import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionRequest;
173import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionResponse;
174import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetCachedFilesListRequest;
175import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetCachedFilesListResponse;
176import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetOnlineRegionRequest;
177import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetOnlineRegionResponse;
178import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoRequest;
179import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoResponse;
180import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionLoadRequest;
181import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionLoadResponse;
182import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetServerInfoRequest;
183import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetServerInfoResponse;
184import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetStoreFileRequest;
185import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetStoreFileResponse;
186import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionRequest;
187import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionRequest.RegionOpenInfo;
188import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionResponse;
189import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionResponse.RegionOpeningState;
190import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.RemoteProcedureRequest;
191import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ReplicateWALEntryRequest;
192import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ReplicateWALEntryResponse;
193import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.RollWALWriterRequest;
194import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.RollWALWriterResponse;
195import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.StopServerRequest;
196import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.StopServerResponse;
197import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateFavoredNodesRequest;
198import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateFavoredNodesResponse;
199import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WALEntry;
200import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WarmupRegionRequest;
201import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WarmupRegionResponse;
202import org.apache.hadoop.hbase.shaded.protobuf.generated.BootstrapNodeProtos.BootstrapNodeService;
203import org.apache.hadoop.hbase.shaded.protobuf.generated.BootstrapNodeProtos.GetAllBootstrapNodesRequest;
204import org.apache.hadoop.hbase.shaded.protobuf.generated.BootstrapNodeProtos.GetAllBootstrapNodesResponse;
205import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
206import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.Action;
207import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.BulkLoadHFileRequest;
208import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.BulkLoadHFileRequest.FamilyPath;
209import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.BulkLoadHFileResponse;
210import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CleanupBulkLoadRequest;
211import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CleanupBulkLoadResponse;
212import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService;
213import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.Condition;
214import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceRequest;
215import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceResponse;
216import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.GetRequest;
217import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.GetResponse;
218import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MultiRegionLoadStats;
219import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MultiRequest;
220import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MultiResponse;
221import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateRequest;
222import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateResponse;
223import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutationProto;
224import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutationProto.MutationType;
225import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.PrepareBulkLoadRequest;
226import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.PrepareBulkLoadResponse;
227import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.RegionAction;
228import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.RegionActionResult;
229import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ResultOrException;
230import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanRequest;
231import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanResponse;
232import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos;
233import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos.RegionLoad;
234import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.NameBytesPair;
235import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.NameInt64Pair;
236import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier;
237import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType;
238import org.apache.hadoop.hbase.shaded.protobuf.generated.MapReduceProtos.ScanMetrics;
239import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsRequest;
240import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsResponse;
241import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsResponse.TableQuotaSnapshot;
242import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.ClientMetaService;
243import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.BulkLoadDescriptor;
244import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.CompactionDescriptor;
245import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.FlushDescriptor;
246import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.RegionEventDescriptor;
247
248/**
249 * Implements the regionserver RPC services.
250 */
251@InterfaceAudience.Private
252public class RSRpcServices extends HBaseRpcServicesBase<HRegionServer>
253  implements ClientService.BlockingInterface, BootstrapNodeService.BlockingInterface {
254
255  private static final Logger LOG = LoggerFactory.getLogger(RSRpcServices.class);
256
257  /** RPC scheduler to use for the region server. */
258  public static final String REGION_SERVER_RPC_SCHEDULER_FACTORY_CLASS =
259    "hbase.region.server.rpc.scheduler.factory.class";
260
261  /**
262   * Minimum allowable time limit delta (in milliseconds) that can be enforced during scans. This
263   * configuration exists to prevent the scenario where a time limit is specified to be so
264   * restrictive that the time limit is reached immediately (before any cells are scanned).
265   */
266  private static final String REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA =
267    "hbase.region.server.rpc.minimum.scan.time.limit.delta";
268  /**
269   * Default value of {@link RSRpcServices#REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA}
270   */
271  static final long DEFAULT_REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA = 10;
272
273  /**
274   * Whether to reject rows with size > threshold defined by
275   * {@link HConstants#BATCH_ROWS_THRESHOLD_NAME}
276   */
277  private static final String REJECT_BATCH_ROWS_OVER_THRESHOLD =
278    "hbase.rpc.rows.size.threshold.reject";
279
280  /**
281   * Default value of config {@link RSRpcServices#REJECT_BATCH_ROWS_OVER_THRESHOLD}
282   */
283  private static final boolean DEFAULT_REJECT_BATCH_ROWS_OVER_THRESHOLD = false;
284
285  // Request counter. (Includes requests that are not serviced by regions.)
286  // Count only once for requests with multiple actions like multi/caching-scan/replayBatch
287  final LongAdder requestCount = new LongAdder();
288
289  // Request counter for rpc get
290  final LongAdder rpcGetRequestCount = new LongAdder();
291
292  // Request counter for rpc scan
293  final LongAdder rpcScanRequestCount = new LongAdder();
294
295  // Request counter for scans that might end up in full scans
296  final LongAdder rpcFullScanRequestCount = new LongAdder();
297
298  // Request counter for rpc multi
299  final LongAdder rpcMultiRequestCount = new LongAdder();
300
301  // Request counter for rpc mutate
302  final LongAdder rpcMutateRequestCount = new LongAdder();
303
304  private volatile long maxScannerResultSize;
305
306  private ScannerIdGenerator scannerIdGenerator;
307  private final ConcurrentMap<String, RegionScannerHolder> scanners = new ConcurrentHashMap<>();
308  // Hold the name and last sequence number of a closed scanner for a while. This is used
309  // to keep compatible for old clients which may send next or close request to a region
310  // scanner which has already been exhausted. The entries will be removed automatically
311  // after scannerLeaseTimeoutPeriod.
312  private final Cache<String, Long> closedScanners;
313  /**
314   * The lease timeout period for client scanners (milliseconds).
315   */
316  private final int scannerLeaseTimeoutPeriod;
317
318  /**
319   * The RPC timeout period (milliseconds)
320   */
321  private final int rpcTimeout;
322
323  /**
324   * The minimum allowable delta to use for the scan limit
325   */
326  private final long minimumScanTimeLimitDelta;
327
328  /**
329   * Row size threshold for multi requests above which a warning is logged
330   */
331  private volatile int rowSizeWarnThreshold;
332  /*
333   * Whether we should reject requests with very high no of rows i.e. beyond threshold defined by
334   * rowSizeWarnThreshold
335   */
336  private volatile boolean rejectRowsWithSizeOverThreshold;
337
338  final AtomicBoolean clearCompactionQueues = new AtomicBoolean(false);
339
340  /**
341   * Services launched in RSRpcServices. By default they are on but you can use the below booleans
342   * to selectively enable/disable these services (Rare is the case where you would ever turn off
343   * one or the other).
344   */
345  public static final String REGIONSERVER_ADMIN_SERVICE_CONFIG =
346    "hbase.regionserver.admin.executorService";
347  public static final String REGIONSERVER_CLIENT_SERVICE_CONFIG =
348    "hbase.regionserver.client.executorService";
349  public static final String REGIONSERVER_CLIENT_META_SERVICE_CONFIG =
350    "hbase.regionserver.client.meta.executorService";
351  public static final String REGIONSERVER_BOOTSTRAP_NODES_SERVICE_CONFIG =
352    "hbase.regionserver.bootstrap.nodes.executorService";
353
354  /**
355   * An Rpc callback for closing a RegionScanner.
356   */
357  private static final class RegionScannerCloseCallBack implements RpcCallback {
358
359    private final RegionScanner scanner;
360
361    public RegionScannerCloseCallBack(RegionScanner scanner) {
362      this.scanner = scanner;
363    }
364
365    @Override
366    public void run() throws IOException {
367      this.scanner.close();
368    }
369  }
370
371  /**
372   * An Rpc callback for doing shipped() call on a RegionScanner.
373   */
374  private class RegionScannerShippedCallBack implements RpcCallback {
375    private final String scannerName;
376    private final Shipper shipper;
377    private final Lease lease;
378
379    public RegionScannerShippedCallBack(String scannerName, Shipper shipper, Lease lease) {
380      this.scannerName = scannerName;
381      this.shipper = shipper;
382      this.lease = lease;
383    }
384
385    @Override
386    public void run() throws IOException {
387      this.shipper.shipped();
388      // We're done. On way out re-add the above removed lease. The lease was temp removed for this
389      // Rpc call and we are at end of the call now. Time to add it back.
390      if (scanners.containsKey(scannerName)) {
391        if (lease != null) {
392          server.getLeaseManager().addLease(lease);
393        }
394      }
395    }
396  }
397
398  /**
399   * An RpcCallBack that creates a list of scanners that needs to perform callBack operation on
400   * completion of multiGets.
401   */
402  static class RegionScannersCloseCallBack implements RpcCallback {
403    private final List<RegionScanner> scanners = new ArrayList<>();
404
405    public void addScanner(RegionScanner scanner) {
406      this.scanners.add(scanner);
407    }
408
409    @Override
410    public void run() {
411      for (RegionScanner scanner : scanners) {
412        try {
413          scanner.close();
414        } catch (IOException e) {
415          LOG.error("Exception while closing the scanner " + scanner, e);
416        }
417      }
418    }
419  }
420
421  /**
422   * Holder class which holds the RegionScanner, nextCallSeq and RpcCallbacks together.
423   */
424  static final class RegionScannerHolder {
425    private final AtomicLong nextCallSeq = new AtomicLong(0);
426    private final RegionScanner s;
427    private final HRegion r;
428    private final RpcCallback closeCallBack;
429    private final RpcCallback shippedCallback;
430    private byte[] rowOfLastPartialResult;
431    private boolean needCursor;
432    private boolean fullRegionScan;
433    private final String clientIPAndPort;
434    private final String userName;
435    private volatile long maxBlockBytesScanned = 0;
436    private volatile long prevBlockBytesScanned = 0;
437    private volatile long prevBlockBytesScannedDifference = 0;
438
439    RegionScannerHolder(RegionScanner s, HRegion r, RpcCallback closeCallBack,
440      RpcCallback shippedCallback, boolean needCursor, boolean fullRegionScan,
441      String clientIPAndPort, String userName) {
442      this.s = s;
443      this.r = r;
444      this.closeCallBack = closeCallBack;
445      this.shippedCallback = shippedCallback;
446      this.needCursor = needCursor;
447      this.fullRegionScan = fullRegionScan;
448      this.clientIPAndPort = clientIPAndPort;
449      this.userName = userName;
450    }
451
452    long getNextCallSeq() {
453      return nextCallSeq.get();
454    }
455
456    boolean incNextCallSeq(long currentSeq) {
457      // Use CAS to prevent multiple scan request running on the same scanner.
458      return nextCallSeq.compareAndSet(currentSeq, currentSeq + 1);
459    }
460
461    long getMaxBlockBytesScanned() {
462      return maxBlockBytesScanned;
463    }
464
465    long getPrevBlockBytesScannedDifference() {
466      return prevBlockBytesScannedDifference;
467    }
468
469    void updateBlockBytesScanned(long blockBytesScanned) {
470      prevBlockBytesScannedDifference = blockBytesScanned - prevBlockBytesScanned;
471      prevBlockBytesScanned = blockBytesScanned;
472      if (blockBytesScanned > maxBlockBytesScanned) {
473        maxBlockBytesScanned = blockBytesScanned;
474      }
475    }
476
477    // Should be called only when we need to print lease expired messages otherwise
478    // cache the String once made.
479    @Override
480    public String toString() {
481      return "clientIPAndPort=" + this.clientIPAndPort + ", userName=" + this.userName
482        + ", regionInfo=" + this.r.getRegionInfo().getRegionNameAsString();
483    }
484  }
485
486  /**
487   * Instantiated as a scanner lease. If the lease times out, the scanner is closed
488   */
489  private class ScannerListener implements LeaseListener {
490    private final String scannerName;
491
492    ScannerListener(final String n) {
493      this.scannerName = n;
494    }
495
496    @Override
497    public void leaseExpired() {
498      RegionScannerHolder rsh = scanners.remove(this.scannerName);
499      if (rsh == null) {
500        LOG.warn("Scanner lease {} expired but no outstanding scanner", this.scannerName);
501        return;
502      }
503      LOG.info("Scanner lease {} expired {}", this.scannerName, rsh);
504      server.getMetrics().incrScannerLeaseExpired();
505      RegionScanner s = rsh.s;
506      HRegion region = null;
507      try {
508        region = server.getRegion(s.getRegionInfo().getRegionName());
509        if (region != null && region.getCoprocessorHost() != null) {
510          region.getCoprocessorHost().preScannerClose(s);
511        }
512      } catch (IOException e) {
513        LOG.error("Closing scanner {} {}", this.scannerName, rsh, e);
514      } finally {
515        try {
516          s.close();
517          if (region != null && region.getCoprocessorHost() != null) {
518            region.getCoprocessorHost().postScannerClose(s);
519          }
520        } catch (IOException e) {
521          LOG.error("Closing scanner {} {}", this.scannerName, rsh, e);
522        }
523      }
524    }
525  }
526
527  private static ResultOrException getResultOrException(final ClientProtos.Result r,
528    final int index) {
529    return getResultOrException(ResponseConverter.buildActionResult(r), index);
530  }
531
532  private static ResultOrException getResultOrException(final Exception e, final int index) {
533    return getResultOrException(ResponseConverter.buildActionResult(e), index);
534  }
535
536  private static ResultOrException getResultOrException(final ResultOrException.Builder builder,
537    final int index) {
538    return builder.setIndex(index).build();
539  }
540
541  /**
542   * Checks for the following pre-checks in order:
543   * <ol>
544   * <li>RegionServer is running</li>
545   * <li>If authorization is enabled, then RPC caller has ADMIN permissions</li>
546   * </ol>
547   * @param requestName name of rpc request. Used in reporting failures to provide context.
548   * @throws ServiceException If any of the above listed pre-check fails.
549   */
550  private void rpcPreCheck(String requestName) throws ServiceException {
551    try {
552      checkOpen();
553      requirePermission(requestName, Permission.Action.ADMIN);
554    } catch (IOException ioe) {
555      throw new ServiceException(ioe);
556    }
557  }
558
559  private boolean isClientCellBlockSupport(RpcCallContext context) {
560    return context != null && context.isClientCellBlockSupported();
561  }
562
563  private void addResult(final MutateResponse.Builder builder, final Result result,
564    final HBaseRpcController rpcc, boolean clientCellBlockSupported) {
565    if (result == null) return;
566    if (clientCellBlockSupported) {
567      builder.setResult(ProtobufUtil.toResultNoData(result));
568      rpcc.setCellScanner(result.cellScanner());
569    } else {
570      ClientProtos.Result pbr = ProtobufUtil.toResult(result);
571      builder.setResult(pbr);
572    }
573  }
574
575  private void addResults(ScanResponse.Builder builder, List<Result> results,
576    HBaseRpcController controller, boolean isDefaultRegion, boolean clientCellBlockSupported) {
577    builder.setStale(!isDefaultRegion);
578    if (results.isEmpty()) {
579      return;
580    }
581    if (clientCellBlockSupported) {
582      for (Result res : results) {
583        builder.addCellsPerResult(res.size());
584        builder.addPartialFlagPerResult(res.mayHaveMoreCellsInRow());
585      }
586      controller.setCellScanner(PrivateCellUtil.createExtendedCellScanner(results));
587    } else {
588      for (Result res : results) {
589        ClientProtos.Result pbr = ProtobufUtil.toResult(res);
590        builder.addResults(pbr);
591      }
592    }
593  }
594
595  private CheckAndMutateResult checkAndMutate(HRegion region, List<ClientProtos.Action> actions,
596    CellScanner cellScanner, Condition condition, long nonceGroup,
597    ActivePolicyEnforcement spaceQuotaEnforcement) throws IOException {
598    int countOfCompleteMutation = 0;
599    try {
600      if (!region.getRegionInfo().isMetaRegion()) {
601        server.getMemStoreFlusher().reclaimMemStoreMemory();
602      }
603      List<Mutation> mutations = new ArrayList<>();
604      long nonce = HConstants.NO_NONCE;
605      for (ClientProtos.Action action : actions) {
606        if (action.hasGet()) {
607          throw new DoNotRetryIOException(
608            "Atomic put and/or delete only, not a Get=" + action.getGet());
609        }
610        MutationProto mutation = action.getMutation();
611        MutationType type = mutation.getMutateType();
612        switch (type) {
613          case PUT:
614            Put put = ProtobufUtil.toPut(mutation, cellScanner);
615            ++countOfCompleteMutation;
616            checkCellSizeLimit(region, put);
617            spaceQuotaEnforcement.getPolicyEnforcement(region).check(put);
618            mutations.add(put);
619            break;
620          case DELETE:
621            Delete del = ProtobufUtil.toDelete(mutation, cellScanner);
622            ++countOfCompleteMutation;
623            spaceQuotaEnforcement.getPolicyEnforcement(region).check(del);
624            mutations.add(del);
625            break;
626          case INCREMENT:
627            Increment increment = ProtobufUtil.toIncrement(mutation, cellScanner);
628            nonce = mutation.hasNonce() ? mutation.getNonce() : HConstants.NO_NONCE;
629            ++countOfCompleteMutation;
630            checkCellSizeLimit(region, increment);
631            spaceQuotaEnforcement.getPolicyEnforcement(region).check(increment);
632            mutations.add(increment);
633            break;
634          case APPEND:
635            Append append = ProtobufUtil.toAppend(mutation, cellScanner);
636            nonce = mutation.hasNonce() ? mutation.getNonce() : HConstants.NO_NONCE;
637            ++countOfCompleteMutation;
638            checkCellSizeLimit(region, append);
639            spaceQuotaEnforcement.getPolicyEnforcement(region).check(append);
640            mutations.add(append);
641            break;
642          default:
643            throw new DoNotRetryIOException("invalid mutation type : " + type);
644        }
645      }
646
647      if (mutations.size() == 0) {
648        return new CheckAndMutateResult(true, null);
649      } else {
650        CheckAndMutate checkAndMutate = ProtobufUtil.toCheckAndMutate(condition, mutations);
651        CheckAndMutateResult result = null;
652        if (region.getCoprocessorHost() != null) {
653          result = region.getCoprocessorHost().preCheckAndMutate(checkAndMutate);
654        }
655        if (result == null) {
656          result = region.checkAndMutate(checkAndMutate, nonceGroup, nonce);
657          if (region.getCoprocessorHost() != null) {
658            result = region.getCoprocessorHost().postCheckAndMutate(checkAndMutate, result);
659          }
660        }
661        return result;
662      }
663    } finally {
664      // Currently, the checkAndMutate isn't supported by batch so it won't mess up the cell scanner
665      // even if the malformed cells are not skipped.
666      for (int i = countOfCompleteMutation; i < actions.size(); ++i) {
667        skipCellsForMutation(actions.get(i), cellScanner);
668      }
669    }
670  }
671
672  /**
673   * Execute an append mutation.
674   * @return result to return to client if default operation should be bypassed as indicated by
675   *         RegionObserver, null otherwise
676   */
677  private Result append(final HRegion region, final OperationQuota quota,
678    final MutationProto mutation, final CellScanner cellScanner, long nonceGroup,
679    ActivePolicyEnforcement spaceQuota, RpcCallContext context) throws IOException {
680    long before = EnvironmentEdgeManager.currentTime();
681    Append append = ProtobufUtil.toAppend(mutation, cellScanner);
682    checkCellSizeLimit(region, append);
683    spaceQuota.getPolicyEnforcement(region).check(append);
684    quota.addMutation(append);
685    long blockBytesScannedBefore = context != null ? context.getBlockBytesScanned() : 0;
686    long nonce = mutation.hasNonce() ? mutation.getNonce() : HConstants.NO_NONCE;
687    Result r = region.append(append, nonceGroup, nonce);
688    if (server.getMetrics() != null) {
689      long blockBytesScanned =
690        context != null ? context.getBlockBytesScanned() - blockBytesScannedBefore : 0;
691      server.getMetrics().updateAppend(region, EnvironmentEdgeManager.currentTime() - before,
692        blockBytesScanned);
693    }
694    return r == null ? Result.EMPTY_RESULT : r;
695  }
696
697  /**
698   * Execute an increment mutation.
699   */
700  private Result increment(final HRegion region, final OperationQuota quota,
701    final MutationProto mutation, final CellScanner cells, long nonceGroup,
702    ActivePolicyEnforcement spaceQuota, RpcCallContext context) throws IOException {
703    long before = EnvironmentEdgeManager.currentTime();
704    Increment increment = ProtobufUtil.toIncrement(mutation, cells);
705    checkCellSizeLimit(region, increment);
706    spaceQuota.getPolicyEnforcement(region).check(increment);
707    quota.addMutation(increment);
708    long blockBytesScannedBefore = context != null ? context.getBlockBytesScanned() : 0;
709    long nonce = mutation.hasNonce() ? mutation.getNonce() : HConstants.NO_NONCE;
710    Result r = region.increment(increment, nonceGroup, nonce);
711    final MetricsRegionServer metricsRegionServer = server.getMetrics();
712    if (metricsRegionServer != null) {
713      long blockBytesScanned =
714        context != null ? context.getBlockBytesScanned() - blockBytesScannedBefore : 0;
715      metricsRegionServer.updateIncrement(region, EnvironmentEdgeManager.currentTime() - before,
716        blockBytesScanned);
717    }
718    return r == null ? Result.EMPTY_RESULT : r;
719  }
720
721  /**
722   * Run through the regionMutation <code>rm</code> and per Mutation, do the work, and then when
723   * done, add an instance of a {@link ResultOrException} that corresponds to each Mutation.
724   * @param cellsToReturn Could be null. May be allocated in this method. This is what this method
725   *                      returns as a 'result'.
726   * @param closeCallBack the callback to be used with multigets
727   * @param context       the current RpcCallContext
728   * @return Return the <code>cellScanner</code> passed
729   */
730  private List<ExtendedCellScannable> doNonAtomicRegionMutation(final HRegion region,
731    final OperationQuota quota, final RegionAction actions, final CellScanner cellScanner,
732    final RegionActionResult.Builder builder, List<ExtendedCellScannable> cellsToReturn,
733    long nonceGroup, final RegionScannersCloseCallBack closeCallBack, RpcCallContext context,
734    ActivePolicyEnforcement spaceQuotaEnforcement) {
735    // Gather up CONTIGUOUS Puts and Deletes in this mutations List. Idea is that rather than do
736    // one at a time, we instead pass them in batch. Be aware that the corresponding
737    // ResultOrException instance that matches each Put or Delete is then added down in the
738    // doNonAtomicBatchOp call. We should be staying aligned though the Put and Delete are
739    // deferred/batched
740    List<ClientProtos.Action> mutations = null;
741    long maxQuotaResultSize = Math.min(maxScannerResultSize, quota.getMaxResultSize());
742    IOException sizeIOE = null;
743    ClientProtos.ResultOrException.Builder resultOrExceptionBuilder =
744      ResultOrException.newBuilder();
745    boolean hasResultOrException = false;
746    for (ClientProtos.Action action : actions.getActionList()) {
747      hasResultOrException = false;
748      resultOrExceptionBuilder.clear();
749      try {
750        Result r = null;
751        long blockBytesScannedBefore = context != null ? context.getBlockBytesScanned() : 0;
752        if (
753          context != null && context.isRetryImmediatelySupported()
754            && (context.getResponseCellSize() > maxQuotaResultSize
755              || blockBytesScannedBefore + context.getResponseExceptionSize() > maxQuotaResultSize)
756        ) {
757
758          // We're storing the exception since the exception and reason string won't
759          // change after the response size limit is reached.
760          if (sizeIOE == null) {
761            // We don't need the stack un-winding do don't throw the exception.
762            // Throwing will kill the JVM's JIT.
763            //
764            // Instead just create the exception and then store it.
765            sizeIOE = new MultiActionResultTooLarge("Max size exceeded" + " CellSize: "
766              + context.getResponseCellSize() + " BlockSize: " + blockBytesScannedBefore);
767
768            // Only report the exception once since there's only one request that
769            // caused the exception. Otherwise this number will dominate the exceptions count.
770            rpcServer.getMetrics().exception(sizeIOE);
771          }
772
773          // Now that there's an exception is known to be created
774          // use it for the response.
775          //
776          // This will create a copy in the builder.
777          NameBytesPair pair = ResponseConverter.buildException(sizeIOE);
778          resultOrExceptionBuilder.setException(pair);
779          context.incrementResponseExceptionSize(pair.getSerializedSize());
780          resultOrExceptionBuilder.setIndex(action.getIndex());
781          builder.addResultOrException(resultOrExceptionBuilder.build());
782          skipCellsForMutation(action, cellScanner);
783          continue;
784        }
785        if (action.hasGet()) {
786          long before = EnvironmentEdgeManager.currentTime();
787          ClientProtos.Get pbGet = action.getGet();
788          // An asynchbase client, https://github.com/OpenTSDB/asynchbase, starts by trying to do
789          // a get closest before. Throwing the UnknownProtocolException signals it that it needs
790          // to switch and do hbase2 protocol (HBase servers do not tell clients what versions
791          // they are; its a problem for non-native clients like asynchbase. HBASE-20225.
792          if (pbGet.hasClosestRowBefore() && pbGet.getClosestRowBefore()) {
793            throw new UnknownProtocolException("Is this a pre-hbase-1.0.0 or asynchbase client? "
794              + "Client is invoking getClosestRowBefore removed in hbase-2.0.0 replaced by "
795              + "reverse Scan.");
796          }
797          try {
798            Get get = ProtobufUtil.toGet(pbGet);
799            if (context != null) {
800              r = get(get, (region), closeCallBack, context);
801            } else {
802              r = region.get(get);
803            }
804          } finally {
805            final MetricsRegionServer metricsRegionServer = server.getMetrics();
806            if (metricsRegionServer != null) {
807              long blockBytesScanned =
808                context != null ? context.getBlockBytesScanned() - blockBytesScannedBefore : 0;
809              metricsRegionServer.updateGet(region, EnvironmentEdgeManager.currentTime() - before,
810                blockBytesScanned);
811            }
812          }
813        } else if (action.hasServiceCall()) {
814          hasResultOrException = true;
815          Message result = execServiceOnRegion(region, action.getServiceCall());
816          ClientProtos.CoprocessorServiceResult.Builder serviceResultBuilder =
817            ClientProtos.CoprocessorServiceResult.newBuilder();
818          resultOrExceptionBuilder.setServiceResult(serviceResultBuilder
819            .setValue(serviceResultBuilder.getValueBuilder().setName(result.getClass().getName())
820              // TODO: Copy!!!
821              .setValue(UnsafeByteOperations.unsafeWrap(result.toByteArray()))));
822        } else if (action.hasMutation()) {
823          MutationType type = action.getMutation().getMutateType();
824          if (
825            type != MutationType.PUT && type != MutationType.DELETE && mutations != null
826              && !mutations.isEmpty()
827          ) {
828            // Flush out any Puts or Deletes already collected.
829            doNonAtomicBatchOp(builder, region, quota, mutations, cellScanner,
830              spaceQuotaEnforcement);
831            mutations.clear();
832          }
833          switch (type) {
834            case APPEND:
835              r = append(region, quota, action.getMutation(), cellScanner, nonceGroup,
836                spaceQuotaEnforcement, context);
837              break;
838            case INCREMENT:
839              r = increment(region, quota, action.getMutation(), cellScanner, nonceGroup,
840                spaceQuotaEnforcement, context);
841              break;
842            case PUT:
843            case DELETE:
844              // Collect the individual mutations and apply in a batch
845              if (mutations == null) {
846                mutations = new ArrayList<>(actions.getActionCount());
847              }
848              mutations.add(action);
849              break;
850            default:
851              throw new DoNotRetryIOException("Unsupported mutate type: " + type.name());
852          }
853        } else {
854          throw new HBaseIOException("Unexpected Action type");
855        }
856        if (r != null) {
857          ClientProtos.Result pbResult = null;
858          if (isClientCellBlockSupport(context)) {
859            pbResult = ProtobufUtil.toResultNoData(r);
860            // Hard to guess the size here. Just make a rough guess.
861            if (cellsToReturn == null) {
862              cellsToReturn = new ArrayList<>();
863            }
864            cellsToReturn.add(r);
865          } else {
866            pbResult = ProtobufUtil.toResult(r);
867          }
868          addSize(context, r);
869          hasResultOrException = true;
870          resultOrExceptionBuilder.setResult(pbResult);
871        }
872        // Could get to here and there was no result and no exception. Presumes we added
873        // a Put or Delete to the collecting Mutations List for adding later. In this
874        // case the corresponding ResultOrException instance for the Put or Delete will be added
875        // down in the doNonAtomicBatchOp method call rather than up here.
876      } catch (IOException ie) {
877        rpcServer.getMetrics().exception(ie);
878        hasResultOrException = true;
879        NameBytesPair pair = ResponseConverter.buildException(ie);
880        resultOrExceptionBuilder.setException(pair);
881        context.incrementResponseExceptionSize(pair.getSerializedSize());
882      }
883      if (hasResultOrException) {
884        // Propagate index.
885        resultOrExceptionBuilder.setIndex(action.getIndex());
886        builder.addResultOrException(resultOrExceptionBuilder.build());
887      }
888    }
889    // Finish up any outstanding mutations
890    if (!CollectionUtils.isEmpty(mutations)) {
891      doNonAtomicBatchOp(builder, region, quota, mutations, cellScanner, spaceQuotaEnforcement);
892    }
893    return cellsToReturn;
894  }
895
896  private void checkCellSizeLimit(final HRegion r, final Mutation m) throws IOException {
897    if (r.maxCellSize > 0) {
898      CellScanner cells = m.cellScanner();
899      while (cells.advance()) {
900        int size = PrivateCellUtil.estimatedSerializedSizeOf(cells.current());
901        if (size > r.maxCellSize) {
902          String msg = "Cell[" + cells.current() + "] with size " + size + " exceeds limit of "
903            + r.maxCellSize + " bytes";
904          LOG.debug(msg);
905          throw new DoNotRetryIOException(msg);
906        }
907      }
908    }
909  }
910
911  private void doAtomicBatchOp(final RegionActionResult.Builder builder, final HRegion region,
912    final OperationQuota quota, final List<ClientProtos.Action> mutations, final CellScanner cells,
913    long nonceGroup, ActivePolicyEnforcement spaceQuotaEnforcement) throws IOException {
914    // Just throw the exception. The exception will be caught and then added to region-level
915    // exception for RegionAction. Leaving the null to action result is ok since the null
916    // result is viewed as failure by hbase client. And the region-lever exception will be used
917    // to replaced the null result. see AsyncRequestFutureImpl#receiveMultiAction and
918    // AsyncBatchRpcRetryingCaller#onComplete for more details.
919    doBatchOp(builder, region, quota, mutations, cells, nonceGroup, spaceQuotaEnforcement, true);
920  }
921
922  private void doNonAtomicBatchOp(final RegionActionResult.Builder builder, final HRegion region,
923    final OperationQuota quota, final List<ClientProtos.Action> mutations, final CellScanner cells,
924    ActivePolicyEnforcement spaceQuotaEnforcement) {
925    try {
926      doBatchOp(builder, region, quota, mutations, cells, HConstants.NO_NONCE,
927        spaceQuotaEnforcement, false);
928    } catch (IOException e) {
929      // Set the exception for each action. The mutations in same RegionAction are group to
930      // different batch and then be processed individually. Hence, we don't set the region-level
931      // exception here for whole RegionAction.
932      for (Action mutation : mutations) {
933        builder.addResultOrException(getResultOrException(e, mutation.getIndex()));
934      }
935    }
936  }
937
938  /**
939   * Execute a list of mutations.
940   */
941  private void doBatchOp(final RegionActionResult.Builder builder, final HRegion region,
942    final OperationQuota quota, final List<ClientProtos.Action> mutations, final CellScanner cells,
943    long nonceGroup, ActivePolicyEnforcement spaceQuotaEnforcement, boolean atomic)
944    throws IOException {
945    Mutation[] mArray = new Mutation[mutations.size()];
946    long before = EnvironmentEdgeManager.currentTime();
947    boolean batchContainsPuts = false, batchContainsDelete = false;
948    try {
949      /**
950       * HBASE-17924 mutationActionMap is a map to map the relation between mutations and actions
951       * since mutation array may have been reoredered.In order to return the right result or
952       * exception to the corresponding actions, We need to know which action is the mutation belong
953       * to. We can't sort ClientProtos.Action array, since they are bonded to cellscanners.
954       */
955      Map<Mutation, ClientProtos.Action> mutationActionMap = new HashMap<>();
956      int i = 0;
957      long nonce = HConstants.NO_NONCE;
958      for (ClientProtos.Action action : mutations) {
959        if (action.hasGet()) {
960          throw new DoNotRetryIOException(
961            "Atomic put and/or delete only, not a Get=" + action.getGet());
962        }
963        MutationProto m = action.getMutation();
964        Mutation mutation;
965        switch (m.getMutateType()) {
966          case PUT:
967            mutation = ProtobufUtil.toPut(m, cells);
968            batchContainsPuts = true;
969            break;
970
971          case DELETE:
972            mutation = ProtobufUtil.toDelete(m, cells);
973            batchContainsDelete = true;
974            break;
975
976          case INCREMENT:
977            mutation = ProtobufUtil.toIncrement(m, cells);
978            nonce = m.hasNonce() ? m.getNonce() : HConstants.NO_NONCE;
979            break;
980
981          case APPEND:
982            mutation = ProtobufUtil.toAppend(m, cells);
983            nonce = m.hasNonce() ? m.getNonce() : HConstants.NO_NONCE;
984            break;
985
986          default:
987            throw new DoNotRetryIOException("Invalid mutation type : " + m.getMutateType());
988        }
989        mutationActionMap.put(mutation, action);
990        mArray[i++] = mutation;
991        checkCellSizeLimit(region, mutation);
992        // Check if a space quota disallows this mutation
993        spaceQuotaEnforcement.getPolicyEnforcement(region).check(mutation);
994        quota.addMutation(mutation);
995      }
996
997      if (!region.getRegionInfo().isMetaRegion()) {
998        server.getMemStoreFlusher().reclaimMemStoreMemory();
999      }
1000
1001      // HBASE-17924
1002      // Sort to improve lock efficiency for non-atomic batch of operations. If atomic
1003      // order is preserved as its expected from the client
1004      if (!atomic) {
1005        Arrays.sort(mArray, (v1, v2) -> Row.COMPARATOR.compare(v1, v2));
1006      }
1007
1008      OperationStatus[] codes = region.batchMutate(mArray, atomic, nonceGroup, nonce);
1009
1010      // When atomic is true, it indicates that the mutateRow API or the batch API with
1011      // RowMutations is called. In this case, we need to merge the results of the
1012      // Increment/Append operations if the mutations include those operations, and set the merged
1013      // result to the first element of the ResultOrException list
1014      if (atomic) {
1015        List<ResultOrException> resultOrExceptions = new ArrayList<>();
1016        List<Result> results = new ArrayList<>();
1017        for (i = 0; i < codes.length; i++) {
1018          if (codes[i].getResult() != null) {
1019            results.add(codes[i].getResult());
1020          }
1021          if (i != 0) {
1022            resultOrExceptions
1023              .add(getResultOrException(ClientProtos.Result.getDefaultInstance(), i));
1024          }
1025        }
1026
1027        if (results.isEmpty()) {
1028          builder.addResultOrException(
1029            getResultOrException(ClientProtos.Result.getDefaultInstance(), 0));
1030        } else {
1031          // Merge the results of the Increment/Append operations
1032          List<Cell> cellList = new ArrayList<>();
1033          for (Result result : results) {
1034            if (result.rawCells() != null) {
1035              cellList.addAll(Arrays.asList(result.rawCells()));
1036            }
1037          }
1038          Result result = Result.create(cellList);
1039
1040          // Set the merged result of the Increment/Append operations to the first element of the
1041          // ResultOrException list
1042          builder.addResultOrException(getResultOrException(ProtobufUtil.toResult(result), 0));
1043        }
1044
1045        builder.addAllResultOrException(resultOrExceptions);
1046        return;
1047      }
1048
1049      for (i = 0; i < codes.length; i++) {
1050        Mutation currentMutation = mArray[i];
1051        ClientProtos.Action currentAction = mutationActionMap.get(currentMutation);
1052        int index = currentAction.hasIndex() ? currentAction.getIndex() : i;
1053        Exception e;
1054        switch (codes[i].getOperationStatusCode()) {
1055          case BAD_FAMILY:
1056            e = new NoSuchColumnFamilyException(codes[i].getExceptionMsg());
1057            builder.addResultOrException(getResultOrException(e, index));
1058            break;
1059
1060          case SANITY_CHECK_FAILURE:
1061            e = new FailedSanityCheckException(codes[i].getExceptionMsg());
1062            builder.addResultOrException(getResultOrException(e, index));
1063            break;
1064
1065          default:
1066            e = new DoNotRetryIOException(codes[i].getExceptionMsg());
1067            builder.addResultOrException(getResultOrException(e, index));
1068            break;
1069
1070          case SUCCESS:
1071            ClientProtos.Result result = codes[i].getResult() == null
1072              ? ClientProtos.Result.getDefaultInstance()
1073              : ProtobufUtil.toResult(codes[i].getResult());
1074            builder.addResultOrException(getResultOrException(result, index));
1075            break;
1076
1077          case STORE_TOO_BUSY:
1078            e = new RegionTooBusyException(codes[i].getExceptionMsg());
1079            builder.addResultOrException(getResultOrException(e, index));
1080            break;
1081        }
1082      }
1083    } finally {
1084      int processedMutationIndex = 0;
1085      for (Action mutation : mutations) {
1086        // The non-null mArray[i] means the cell scanner has been read.
1087        if (mArray[processedMutationIndex++] == null) {
1088          skipCellsForMutation(mutation, cells);
1089        }
1090      }
1091      updateMutationMetrics(region, before, batchContainsPuts, batchContainsDelete);
1092    }
1093  }
1094
1095  private void updateMutationMetrics(HRegion region, long starttime, boolean batchContainsPuts,
1096    boolean batchContainsDelete) {
1097    final MetricsRegionServer metricsRegionServer = server.getMetrics();
1098    if (metricsRegionServer != null) {
1099      long after = EnvironmentEdgeManager.currentTime();
1100      if (batchContainsPuts) {
1101        metricsRegionServer.updatePutBatch(region, after - starttime);
1102      }
1103      if (batchContainsDelete) {
1104        metricsRegionServer.updateDeleteBatch(region, after - starttime);
1105      }
1106    }
1107  }
1108
1109  /**
1110   * Execute a list of Put/Delete mutations. The function returns OperationStatus instead of
1111   * constructing MultiResponse to save a possible loop if caller doesn't need MultiResponse.
1112   * @return an array of OperationStatus which internally contains the OperationStatusCode and the
1113   *         exceptionMessage if any
1114   * @deprecated Since 3.0.0, will be removed in 4.0.0. We do not use this method for replaying
1115   *             edits for secondary replicas any more, see
1116   *             {@link #replicateToReplica(RpcController, ReplicateWALEntryRequest)}.
1117   */
1118  @Deprecated
1119  private OperationStatus[] doReplayBatchOp(final HRegion region,
1120    final List<MutationReplay> mutations, long replaySeqId) throws IOException {
1121    long before = EnvironmentEdgeManager.currentTime();
1122    boolean batchContainsPuts = false, batchContainsDelete = false;
1123    try {
1124      for (Iterator<MutationReplay> it = mutations.iterator(); it.hasNext();) {
1125        MutationReplay m = it.next();
1126
1127        if (m.getType() == MutationType.PUT) {
1128          batchContainsPuts = true;
1129        } else {
1130          batchContainsDelete = true;
1131        }
1132
1133        NavigableMap<byte[], List<Cell>> map = m.mutation.getFamilyCellMap();
1134        List<Cell> metaCells = map.get(WALEdit.METAFAMILY);
1135        if (metaCells != null && !metaCells.isEmpty()) {
1136          for (Cell metaCell : metaCells) {
1137            CompactionDescriptor compactionDesc = WALEdit.getCompaction(metaCell);
1138            boolean isDefaultReplica = RegionReplicaUtil.isDefaultReplica(region.getRegionInfo());
1139            HRegion hRegion = region;
1140            if (compactionDesc != null) {
1141              // replay the compaction. Remove the files from stores only if we are the primary
1142              // region replica (thus own the files)
1143              hRegion.replayWALCompactionMarker(compactionDesc, !isDefaultReplica, isDefaultReplica,
1144                replaySeqId);
1145              continue;
1146            }
1147            FlushDescriptor flushDesc = WALEdit.getFlushDescriptor(metaCell);
1148            if (flushDesc != null && !isDefaultReplica) {
1149              hRegion.replayWALFlushMarker(flushDesc, replaySeqId);
1150              continue;
1151            }
1152            RegionEventDescriptor regionEvent = WALEdit.getRegionEventDescriptor(metaCell);
1153            if (regionEvent != null && !isDefaultReplica) {
1154              hRegion.replayWALRegionEventMarker(regionEvent);
1155              continue;
1156            }
1157            BulkLoadDescriptor bulkLoadEvent = WALEdit.getBulkLoadDescriptor(metaCell);
1158            if (bulkLoadEvent != null) {
1159              hRegion.replayWALBulkLoadEventMarker(bulkLoadEvent);
1160              continue;
1161            }
1162          }
1163          it.remove();
1164        }
1165      }
1166      requestCount.increment();
1167      if (!region.getRegionInfo().isMetaRegion()) {
1168        server.getMemStoreFlusher().reclaimMemStoreMemory();
1169      }
1170      return region.batchReplay(mutations.toArray(new MutationReplay[mutations.size()]),
1171        replaySeqId);
1172    } finally {
1173      updateMutationMetrics(region, before, batchContainsPuts, batchContainsDelete);
1174    }
1175  }
1176
1177  private void closeAllScanners() {
1178    // Close any outstanding scanners. Means they'll get an UnknownScanner
1179    // exception next time they come in.
1180    for (Map.Entry<String, RegionScannerHolder> e : scanners.entrySet()) {
1181      try {
1182        e.getValue().s.close();
1183      } catch (IOException ioe) {
1184        LOG.warn("Closing scanner " + e.getKey(), ioe);
1185      }
1186    }
1187  }
1188
1189  // Directly invoked only for testing
1190  public RSRpcServices(final HRegionServer rs) throws IOException {
1191    super(rs, rs.getProcessName());
1192    final Configuration conf = rs.getConfiguration();
1193    setReloadableGuardrails(conf);
1194    scannerLeaseTimeoutPeriod = conf.getInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD,
1195      HConstants.DEFAULT_HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD);
1196    rpcTimeout =
1197      conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY, HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
1198    minimumScanTimeLimitDelta = conf.getLong(REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA,
1199      DEFAULT_REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA);
1200    rpcServer.setNamedQueueRecorder(rs.getNamedQueueRecorder());
1201    closedScanners = CacheBuilder.newBuilder()
1202      .expireAfterAccess(scannerLeaseTimeoutPeriod, TimeUnit.MILLISECONDS).build();
1203  }
1204
1205  @Override
1206  protected boolean defaultReservoirEnabled() {
1207    return true;
1208  }
1209
1210  @Override
1211  protected ServerType getDNSServerType() {
1212    return DNS.ServerType.REGIONSERVER;
1213  }
1214
1215  @Override
1216  protected String getHostname(Configuration conf, String defaultHostname) {
1217    return conf.get("hbase.regionserver.ipc.address", defaultHostname);
1218  }
1219
1220  @Override
1221  protected String getPortConfigName() {
1222    return HConstants.REGIONSERVER_PORT;
1223  }
1224
1225  @Override
1226  protected int getDefaultPort() {
1227    return HConstants.DEFAULT_REGIONSERVER_PORT;
1228  }
1229
1230  @Override
1231  protected Class<?> getRpcSchedulerFactoryClass(Configuration conf) {
1232    return conf.getClass(REGION_SERVER_RPC_SCHEDULER_FACTORY_CLASS,
1233      SimpleRpcSchedulerFactory.class);
1234  }
1235
1236  protected RpcServerInterface createRpcServer(final Server server,
1237    final RpcSchedulerFactory rpcSchedulerFactory, final InetSocketAddress bindAddress,
1238    final String name) throws IOException {
1239    final Configuration conf = server.getConfiguration();
1240    boolean reservoirEnabled = conf.getBoolean(ByteBuffAllocator.ALLOCATOR_POOL_ENABLED_KEY, true);
1241    try {
1242      return RpcServerFactory.createRpcServer(server, name, getServices(), bindAddress, // use final
1243                                                                                        // bindAddress
1244                                                                                        // for this
1245                                                                                        // server.
1246        conf, rpcSchedulerFactory.create(conf, this, server), reservoirEnabled);
1247    } catch (BindException be) {
1248      throw new IOException(be.getMessage() + ". To switch ports use the '"
1249        + HConstants.REGIONSERVER_PORT + "' configuration property.",
1250        be.getCause() != null ? be.getCause() : be);
1251    }
1252  }
1253
1254  protected Class<?> getRpcSchedulerFactoryClass() {
1255    final Configuration conf = server.getConfiguration();
1256    return conf.getClass(REGION_SERVER_RPC_SCHEDULER_FACTORY_CLASS,
1257      SimpleRpcSchedulerFactory.class);
1258  }
1259
1260  protected PriorityFunction createPriority() {
1261    return new RSAnnotationReadingPriorityFunction(this);
1262  }
1263
1264  public int getScannersCount() {
1265    return scanners.size();
1266  }
1267
1268  /** Returns The outstanding RegionScanner for <code>scannerId</code> or null if none found. */
1269  RegionScanner getScanner(long scannerId) {
1270    RegionScannerHolder rsh = getRegionScannerHolder(scannerId);
1271    return rsh == null ? null : rsh.s;
1272  }
1273
1274  /** Returns The associated RegionScannerHolder for <code>scannerId</code> or null. */
1275  private RegionScannerHolder getRegionScannerHolder(long scannerId) {
1276    return scanners.get(toScannerName(scannerId));
1277  }
1278
1279  public String getScanDetailsWithId(long scannerId) {
1280    RegionScanner scanner = getScanner(scannerId);
1281    if (scanner == null) {
1282      return null;
1283    }
1284    StringBuilder builder = new StringBuilder();
1285    builder.append("table: ").append(scanner.getRegionInfo().getTable().getNameAsString());
1286    builder.append(" region: ").append(scanner.getRegionInfo().getRegionNameAsString());
1287    builder.append(" operation_id: ").append(scanner.getOperationId());
1288    return builder.toString();
1289  }
1290
1291  public String getScanDetailsWithRequest(ScanRequest request) {
1292    try {
1293      if (!request.hasRegion()) {
1294        return null;
1295      }
1296      Region region = getRegion(request.getRegion());
1297      StringBuilder builder = new StringBuilder();
1298      builder.append("table: ").append(region.getRegionInfo().getTable().getNameAsString());
1299      builder.append(" region: ").append(region.getRegionInfo().getRegionNameAsString());
1300      for (NameBytesPair pair : request.getScan().getAttributeList()) {
1301        if (OperationWithAttributes.ID_ATRIBUTE.equals(pair.getName())) {
1302          builder.append(" operation_id: ").append(Bytes.toString(pair.getValue().toByteArray()));
1303          break;
1304        }
1305      }
1306      return builder.toString();
1307    } catch (IOException ignored) {
1308      return null;
1309    }
1310  }
1311
1312  /**
1313   * Get the vtime associated with the scanner. Currently the vtime is the number of "next" calls.
1314   */
1315  long getScannerVirtualTime(long scannerId) {
1316    RegionScannerHolder rsh = getRegionScannerHolder(scannerId);
1317    return rsh == null ? 0L : rsh.getNextCallSeq();
1318  }
1319
1320  /**
1321   * Method to account for the size of retained cells.
1322   * @param context rpc call context
1323   * @param r       result to add size.
1324   * @return an object that represents the last referenced block from this response.
1325   */
1326  void addSize(RpcCallContext context, Result r) {
1327    if (context != null && r != null && !r.isEmpty()) {
1328      for (Cell c : r.rawCells()) {
1329        context.incrementResponseCellSize(PrivateCellUtil.estimatedSerializedSizeOf(c));
1330      }
1331    }
1332  }
1333
1334  /** Returns Remote client's ip and port else null if can't be determined. */
1335  @RestrictedApi(explanation = "Should only be called in TestRSRpcServices and RSRpcServices",
1336      link = "", allowedOnPath = ".*(TestRSRpcServices|RSRpcServices).java")
1337  static String getRemoteClientIpAndPort() {
1338    RpcCall rpcCall = RpcServer.getCurrentCall().orElse(null);
1339    if (rpcCall == null) {
1340      return HConstants.EMPTY_STRING;
1341    }
1342    InetAddress address = rpcCall.getRemoteAddress();
1343    if (address == null) {
1344      return HConstants.EMPTY_STRING;
1345    }
1346    // Be careful here with InetAddress. Do InetAddress#getHostAddress. It will not do a name
1347    // resolution. Just use the IP. It is generally a smaller amount of info to keep around while
1348    // scanning than a hostname anyways.
1349    return Address.fromParts(address.getHostAddress(), rpcCall.getRemotePort()).toString();
1350  }
1351
1352  /** Returns Remote client's username. */
1353  @RestrictedApi(explanation = "Should only be called in TestRSRpcServices and RSRpcServices",
1354      link = "", allowedOnPath = ".*(TestRSRpcServices|RSRpcServices).java")
1355  static String getUserName() {
1356    RpcCall rpcCall = RpcServer.getCurrentCall().orElse(null);
1357    if (rpcCall == null) {
1358      return HConstants.EMPTY_STRING;
1359    }
1360    return rpcCall.getRequestUserName().orElse(HConstants.EMPTY_STRING);
1361  }
1362
1363  private RegionScannerHolder addScanner(String scannerName, RegionScanner s, Shipper shipper,
1364    HRegion r, boolean needCursor, boolean fullRegionScan) throws LeaseStillHeldException {
1365    Lease lease = server.getLeaseManager().createLease(scannerName, this.scannerLeaseTimeoutPeriod,
1366      new ScannerListener(scannerName));
1367    RpcCallback shippedCallback = new RegionScannerShippedCallBack(scannerName, shipper, lease);
1368    RpcCallback closeCallback =
1369      s instanceof RpcCallback ? (RpcCallback) s : new RegionScannerCloseCallBack(s);
1370    RegionScannerHolder rsh = new RegionScannerHolder(s, r, closeCallback, shippedCallback,
1371      needCursor, fullRegionScan, getRemoteClientIpAndPort(), getUserName());
1372    RegionScannerHolder existing = scanners.putIfAbsent(scannerName, rsh);
1373    assert existing == null : "scannerId must be unique within regionserver's whole lifecycle! "
1374      + scannerName + ", " + existing;
1375    return rsh;
1376  }
1377
1378  private boolean isFullRegionScan(Scan scan, HRegion region) {
1379    // If the scan start row equals or less than the start key of the region
1380    // and stop row greater than equals end key (if stop row present)
1381    // or if the stop row is empty
1382    // account this as a full region scan
1383    if (
1384      Bytes.compareTo(scan.getStartRow(), region.getRegionInfo().getStartKey()) <= 0
1385        && (Bytes.compareTo(scan.getStopRow(), region.getRegionInfo().getEndKey()) >= 0
1386          && !Bytes.equals(region.getRegionInfo().getEndKey(), HConstants.EMPTY_END_ROW)
1387          || Bytes.equals(scan.getStopRow(), HConstants.EMPTY_END_ROW))
1388    ) {
1389      return true;
1390    }
1391    return false;
1392  }
1393
1394  /**
1395   * Find the HRegion based on a region specifier
1396   * @param regionSpecifier the region specifier
1397   * @return the corresponding region
1398   * @throws IOException if the specifier is not null, but failed to find the region
1399   */
1400  public HRegion getRegion(final RegionSpecifier regionSpecifier) throws IOException {
1401    return server.getRegion(regionSpecifier.getValue().toByteArray());
1402  }
1403
1404  /**
1405   * Find the List of HRegions based on a list of region specifiers
1406   * @param regionSpecifiers the list of region specifiers
1407   * @return the corresponding list of regions
1408   * @throws IOException if any of the specifiers is not null, but failed to find the region
1409   */
1410  private List<HRegion> getRegions(final List<RegionSpecifier> regionSpecifiers,
1411    final CacheEvictionStatsBuilder stats) {
1412    List<HRegion> regions = Lists.newArrayListWithCapacity(regionSpecifiers.size());
1413    for (RegionSpecifier regionSpecifier : regionSpecifiers) {
1414      try {
1415        regions.add(server.getRegion(regionSpecifier.getValue().toByteArray()));
1416      } catch (NotServingRegionException e) {
1417        stats.addException(regionSpecifier.getValue().toByteArray(), e);
1418      }
1419    }
1420    return regions;
1421  }
1422
1423  PriorityFunction getPriority() {
1424    return priority;
1425  }
1426
1427  private RegionServerRpcQuotaManager getRpcQuotaManager() {
1428    return server.getRegionServerRpcQuotaManager();
1429  }
1430
1431  private RegionServerSpaceQuotaManager getSpaceQuotaManager() {
1432    return server.getRegionServerSpaceQuotaManager();
1433  }
1434
1435  void start(ZKWatcher zkWatcher) {
1436    this.scannerIdGenerator = new ScannerIdGenerator(this.server.getServerName());
1437    internalStart(zkWatcher);
1438  }
1439
1440  void stop() {
1441    closeAllScanners();
1442    internalStop();
1443  }
1444
1445  /**
1446   * Called to verify that this server is up and running.
1447   */
1448  // TODO : Rename this and HMaster#checkInitialized to isRunning() (or a better name).
1449  protected void checkOpen() throws IOException {
1450    if (server.isAborted()) {
1451      throw new RegionServerAbortedException("Server " + server.getServerName() + " aborting");
1452    }
1453    if (server.isStopped()) {
1454      throw new RegionServerStoppedException("Server " + server.getServerName() + " stopping");
1455    }
1456    if (!server.isDataFileSystemOk()) {
1457      throw new RegionServerStoppedException("File system not available");
1458    }
1459    if (!server.isOnline()) {
1460      throw new ServerNotRunningYetException(
1461        "Server " + server.getServerName() + " is not running yet");
1462    }
1463  }
1464
1465  /**
1466   * By default, put up an Admin and a Client Service. Set booleans
1467   * <code>hbase.regionserver.admin.executorService</code> and
1468   * <code>hbase.regionserver.client.executorService</code> if you want to enable/disable services.
1469   * Default is that both are enabled.
1470   * @return immutable list of blocking services and the security info classes that this server
1471   *         supports
1472   */
1473  protected List<BlockingServiceAndInterface> getServices() {
1474    boolean admin = getConfiguration().getBoolean(REGIONSERVER_ADMIN_SERVICE_CONFIG, true);
1475    boolean client = getConfiguration().getBoolean(REGIONSERVER_CLIENT_SERVICE_CONFIG, true);
1476    boolean clientMeta =
1477      getConfiguration().getBoolean(REGIONSERVER_CLIENT_META_SERVICE_CONFIG, true);
1478    boolean bootstrapNodes =
1479      getConfiguration().getBoolean(REGIONSERVER_BOOTSTRAP_NODES_SERVICE_CONFIG, true);
1480    List<BlockingServiceAndInterface> bssi = new ArrayList<>();
1481    if (client) {
1482      bssi.add(new BlockingServiceAndInterface(ClientService.newReflectiveBlockingService(this),
1483        ClientService.BlockingInterface.class));
1484    }
1485    if (admin) {
1486      bssi.add(new BlockingServiceAndInterface(AdminService.newReflectiveBlockingService(this),
1487        AdminService.BlockingInterface.class));
1488    }
1489    if (clientMeta) {
1490      bssi.add(new BlockingServiceAndInterface(ClientMetaService.newReflectiveBlockingService(this),
1491        ClientMetaService.BlockingInterface.class));
1492    }
1493    if (bootstrapNodes) {
1494      bssi.add(
1495        new BlockingServiceAndInterface(BootstrapNodeService.newReflectiveBlockingService(this),
1496          BootstrapNodeService.BlockingInterface.class));
1497    }
1498    return new ImmutableList.Builder<BlockingServiceAndInterface>().addAll(bssi).build();
1499  }
1500
1501  /**
1502   * Close a region on the region server.
1503   * @param controller the RPC controller
1504   * @param request    the request
1505   */
1506  @Override
1507  @QosPriority(priority = HConstants.ADMIN_QOS)
1508  public CloseRegionResponse closeRegion(final RpcController controller,
1509    final CloseRegionRequest request) throws ServiceException {
1510    final ServerName sn = (request.hasDestinationServer()
1511      ? ProtobufUtil.toServerName(request.getDestinationServer())
1512      : null);
1513
1514    try {
1515      checkOpen();
1516      throwOnWrongStartCode(request);
1517      final String encodedRegionName = ProtobufUtil.getRegionEncodedName(request.getRegion());
1518
1519      requestCount.increment();
1520      if (sn == null) {
1521        LOG.info("Close " + encodedRegionName + " without moving");
1522      } else {
1523        LOG.info("Close " + encodedRegionName + ", moving to " + sn);
1524      }
1525      boolean closed = server.closeRegion(encodedRegionName, false, sn);
1526      CloseRegionResponse.Builder builder = CloseRegionResponse.newBuilder().setClosed(closed);
1527      return builder.build();
1528    } catch (IOException ie) {
1529      throw new ServiceException(ie);
1530    }
1531  }
1532
1533  /**
1534   * Compact a region on the region server.
1535   * @param controller the RPC controller
1536   * @param request    the request
1537   */
1538  @Override
1539  @QosPriority(priority = HConstants.ADMIN_QOS)
1540  public CompactRegionResponse compactRegion(final RpcController controller,
1541    final CompactRegionRequest request) throws ServiceException {
1542    try {
1543      checkOpen();
1544      requestCount.increment();
1545      HRegion region = getRegion(request.getRegion());
1546      // Quota support is enabled, the requesting user is not system/super user
1547      // and a quota policy is enforced that disables compactions.
1548      if (
1549        QuotaUtil.isQuotaEnabled(getConfiguration())
1550          && !Superusers.isSuperUser(RpcServer.getRequestUser().orElse(null))
1551          && this.server.getRegionServerSpaceQuotaManager()
1552            .areCompactionsDisabled(region.getTableDescriptor().getTableName())
1553      ) {
1554        throw new DoNotRetryIOException(
1555          "Compactions on this region are " + "disabled due to a space quota violation.");
1556      }
1557      region.startRegionOperation(Operation.COMPACT_REGION);
1558      LOG.info("Compacting " + region.getRegionInfo().getRegionNameAsString());
1559      boolean major = request.hasMajor() && request.getMajor();
1560      if (request.hasFamily()) {
1561        byte[] family = request.getFamily().toByteArray();
1562        String log = "User-triggered " + (major ? "major " : "") + "compaction for region "
1563          + region.getRegionInfo().getRegionNameAsString() + " and family "
1564          + Bytes.toString(family);
1565        LOG.trace(log);
1566        region.requestCompaction(family, log, Store.PRIORITY_USER, major,
1567          CompactionLifeCycleTracker.DUMMY);
1568      } else {
1569        String log = "User-triggered " + (major ? "major " : "") + "compaction for region "
1570          + region.getRegionInfo().getRegionNameAsString();
1571        LOG.trace(log);
1572        region.requestCompaction(log, Store.PRIORITY_USER, major, CompactionLifeCycleTracker.DUMMY);
1573      }
1574      return CompactRegionResponse.newBuilder().build();
1575    } catch (IOException ie) {
1576      throw new ServiceException(ie);
1577    }
1578  }
1579
1580  @Override
1581  @QosPriority(priority = HConstants.ADMIN_QOS)
1582  public CompactionSwitchResponse compactionSwitch(RpcController controller,
1583    CompactionSwitchRequest request) throws ServiceException {
1584    rpcPreCheck("compactionSwitch");
1585    final CompactSplit compactSplitThread = server.getCompactSplitThread();
1586    requestCount.increment();
1587    boolean prevState = compactSplitThread.isCompactionsEnabled();
1588    CompactionSwitchResponse response =
1589      CompactionSwitchResponse.newBuilder().setPrevState(prevState).build();
1590    if (prevState == request.getEnabled()) {
1591      // passed in requested state is same as current state. No action required
1592      return response;
1593    }
1594    compactSplitThread.switchCompaction(request.getEnabled());
1595    return response;
1596  }
1597
1598  /**
1599   * Flush a region on the region server.
1600   * @param controller the RPC controller
1601   * @param request    the request
1602   */
1603  @Override
1604  @QosPriority(priority = HConstants.ADMIN_QOS)
1605  public FlushRegionResponse flushRegion(final RpcController controller,
1606    final FlushRegionRequest request) throws ServiceException {
1607    try {
1608      checkOpen();
1609      requestCount.increment();
1610      HRegion region = getRegion(request.getRegion());
1611      LOG.info("Flushing " + region.getRegionInfo().getRegionNameAsString());
1612      boolean shouldFlush = true;
1613      if (request.hasIfOlderThanTs()) {
1614        shouldFlush = region.getEarliestFlushTimeForAllStores() < request.getIfOlderThanTs();
1615      }
1616      FlushRegionResponse.Builder builder = FlushRegionResponse.newBuilder();
1617      if (shouldFlush) {
1618        boolean writeFlushWalMarker =
1619          request.hasWriteFlushWalMarker() ? request.getWriteFlushWalMarker() : false;
1620        // Go behind the curtain so we can manage writing of the flush WAL marker
1621        HRegion.FlushResultImpl flushResult = null;
1622        if (request.hasFamily()) {
1623          List families = new ArrayList();
1624          families.add(request.getFamily().toByteArray());
1625          flushResult =
1626            region.flushcache(families, writeFlushWalMarker, FlushLifeCycleTracker.DUMMY);
1627        } else {
1628          flushResult = region.flushcache(true, writeFlushWalMarker, FlushLifeCycleTracker.DUMMY);
1629        }
1630        boolean compactionNeeded = flushResult.isCompactionNeeded();
1631        if (compactionNeeded) {
1632          server.getCompactSplitThread().requestSystemCompaction(region,
1633            "Compaction through user triggered flush");
1634        }
1635        builder.setFlushed(flushResult.isFlushSucceeded());
1636        builder.setWroteFlushWalMarker(flushResult.wroteFlushWalMarker);
1637      }
1638      builder.setLastFlushTime(region.getEarliestFlushTimeForAllStores());
1639      return builder.build();
1640    } catch (DroppedSnapshotException ex) {
1641      // Cache flush can fail in a few places. If it fails in a critical
1642      // section, we get a DroppedSnapshotException and a replay of wal
1643      // is required. Currently the only way to do this is a restart of
1644      // the server.
1645      server.abort("Replay of WAL required. Forcing server shutdown", ex);
1646      throw new ServiceException(ex);
1647    } catch (IOException ie) {
1648      throw new ServiceException(ie);
1649    }
1650  }
1651
1652  @Override
1653  @QosPriority(priority = HConstants.ADMIN_QOS)
1654  public GetOnlineRegionResponse getOnlineRegion(final RpcController controller,
1655    final GetOnlineRegionRequest request) throws ServiceException {
1656    try {
1657      checkOpen();
1658      requestCount.increment();
1659      Map<String, HRegion> onlineRegions = server.getOnlineRegions();
1660      List<RegionInfo> list = new ArrayList<>(onlineRegions.size());
1661      for (HRegion region : onlineRegions.values()) {
1662        list.add(region.getRegionInfo());
1663      }
1664      list.sort(RegionInfo.COMPARATOR);
1665      return ResponseConverter.buildGetOnlineRegionResponse(list);
1666    } catch (IOException ie) {
1667      throw new ServiceException(ie);
1668    }
1669  }
1670
1671  // Master implementation of this Admin Service differs given it is not
1672  // able to supply detail only known to RegionServer. See note on
1673  // MasterRpcServers#getRegionInfo.
1674  @Override
1675  @QosPriority(priority = HConstants.ADMIN_QOS)
1676  public GetRegionInfoResponse getRegionInfo(final RpcController controller,
1677    final GetRegionInfoRequest request) throws ServiceException {
1678    try {
1679      checkOpen();
1680      requestCount.increment();
1681      HRegion region = getRegion(request.getRegion());
1682      RegionInfo info = region.getRegionInfo();
1683      byte[] bestSplitRow;
1684      if (request.hasBestSplitRow() && request.getBestSplitRow()) {
1685        bestSplitRow = region.checkSplit(true).orElse(null);
1686        // when all table data are in memstore, bestSplitRow = null
1687        // try to flush region first
1688        if (bestSplitRow == null) {
1689          region.flush(true);
1690          bestSplitRow = region.checkSplit(true).orElse(null);
1691        }
1692      } else {
1693        bestSplitRow = null;
1694      }
1695      GetRegionInfoResponse.Builder builder = GetRegionInfoResponse.newBuilder();
1696      builder.setRegionInfo(ProtobufUtil.toRegionInfo(info));
1697      if (request.hasCompactionState() && request.getCompactionState()) {
1698        builder.setCompactionState(ProtobufUtil.createCompactionState(region.getCompactionState()));
1699      }
1700      builder.setSplittable(region.isSplittable());
1701      builder.setMergeable(region.isMergeable());
1702      if (request.hasBestSplitRow() && request.getBestSplitRow() && bestSplitRow != null) {
1703        builder.setBestSplitRow(UnsafeByteOperations.unsafeWrap(bestSplitRow));
1704      }
1705      return builder.build();
1706    } catch (IOException ie) {
1707      throw new ServiceException(ie);
1708    }
1709  }
1710
1711  @Override
1712  @QosPriority(priority = HConstants.ADMIN_QOS)
1713  public GetRegionLoadResponse getRegionLoad(RpcController controller, GetRegionLoadRequest request)
1714    throws ServiceException {
1715
1716    List<HRegion> regions;
1717    if (request.hasTableName()) {
1718      TableName tableName = ProtobufUtil.toTableName(request.getTableName());
1719      regions = server.getRegions(tableName);
1720    } else {
1721      regions = server.getRegions();
1722    }
1723    List<RegionLoad> rLoads = new ArrayList<>(regions.size());
1724    RegionLoad.Builder regionLoadBuilder = ClusterStatusProtos.RegionLoad.newBuilder();
1725    RegionSpecifier.Builder regionSpecifier = RegionSpecifier.newBuilder();
1726
1727    try {
1728      for (HRegion region : regions) {
1729        rLoads.add(server.createRegionLoad(region, regionLoadBuilder, regionSpecifier));
1730      }
1731    } catch (IOException e) {
1732      throw new ServiceException(e);
1733    }
1734    GetRegionLoadResponse.Builder builder = GetRegionLoadResponse.newBuilder();
1735    builder.addAllRegionLoads(rLoads);
1736    return builder.build();
1737  }
1738
1739  @Override
1740  @QosPriority(priority = HConstants.ADMIN_QOS)
1741  public ClearCompactionQueuesResponse clearCompactionQueues(RpcController controller,
1742    ClearCompactionQueuesRequest request) throws ServiceException {
1743    LOG.debug("Client=" + RpcServer.getRequestUserName().orElse(null) + "/"
1744      + RpcServer.getRemoteAddress().orElse(null) + " clear compactions queue");
1745    ClearCompactionQueuesResponse.Builder respBuilder = ClearCompactionQueuesResponse.newBuilder();
1746    requestCount.increment();
1747    if (clearCompactionQueues.compareAndSet(false, true)) {
1748      final CompactSplit compactSplitThread = server.getCompactSplitThread();
1749      try {
1750        checkOpen();
1751        server.getRegionServerCoprocessorHost().preClearCompactionQueues();
1752        for (String queueName : request.getQueueNameList()) {
1753          LOG.debug("clear " + queueName + " compaction queue");
1754          switch (queueName) {
1755            case "long":
1756              compactSplitThread.clearLongCompactionsQueue();
1757              break;
1758            case "short":
1759              compactSplitThread.clearShortCompactionsQueue();
1760              break;
1761            default:
1762              LOG.warn("Unknown queue name " + queueName);
1763              throw new IOException("Unknown queue name " + queueName);
1764          }
1765        }
1766        server.getRegionServerCoprocessorHost().postClearCompactionQueues();
1767      } catch (IOException ie) {
1768        throw new ServiceException(ie);
1769      } finally {
1770        clearCompactionQueues.set(false);
1771      }
1772    } else {
1773      LOG.warn("Clear compactions queue is executing by other admin.");
1774    }
1775    return respBuilder.build();
1776  }
1777
1778  /**
1779   * Get some information of the region server.
1780   * @param controller the RPC controller
1781   * @param request    the request
1782   */
1783  @Override
1784  @QosPriority(priority = HConstants.ADMIN_QOS)
1785  public GetServerInfoResponse getServerInfo(final RpcController controller,
1786    final GetServerInfoRequest request) throws ServiceException {
1787    try {
1788      checkOpen();
1789    } catch (IOException ie) {
1790      throw new ServiceException(ie);
1791    }
1792    requestCount.increment();
1793    int infoPort = server.getInfoServer() != null ? server.getInfoServer().getPort() : -1;
1794    return ResponseConverter.buildGetServerInfoResponse(server.getServerName(), infoPort);
1795  }
1796
1797  @Override
1798  @QosPriority(priority = HConstants.ADMIN_QOS)
1799  public GetStoreFileResponse getStoreFile(final RpcController controller,
1800    final GetStoreFileRequest request) throws ServiceException {
1801    try {
1802      checkOpen();
1803      HRegion region = getRegion(request.getRegion());
1804      requestCount.increment();
1805      Set<byte[]> columnFamilies;
1806      if (request.getFamilyCount() == 0) {
1807        columnFamilies = region.getTableDescriptor().getColumnFamilyNames();
1808      } else {
1809        columnFamilies = new TreeSet<>(Bytes.BYTES_RAWCOMPARATOR);
1810        for (ByteString cf : request.getFamilyList()) {
1811          columnFamilies.add(cf.toByteArray());
1812        }
1813      }
1814      int nCF = columnFamilies.size();
1815      List<String> fileList = region.getStoreFileList(columnFamilies.toArray(new byte[nCF][]));
1816      GetStoreFileResponse.Builder builder = GetStoreFileResponse.newBuilder();
1817      builder.addAllStoreFile(fileList);
1818      return builder.build();
1819    } catch (IOException ie) {
1820      throw new ServiceException(ie);
1821    }
1822  }
1823
1824  private void throwOnWrongStartCode(OpenRegionRequest request) throws ServiceException {
1825    if (!request.hasServerStartCode()) {
1826      LOG.warn("OpenRegionRequest for {} does not have a start code", request.getOpenInfoList());
1827      return;
1828    }
1829    throwOnWrongStartCode(request.getServerStartCode());
1830  }
1831
1832  private void throwOnWrongStartCode(CloseRegionRequest request) throws ServiceException {
1833    if (!request.hasServerStartCode()) {
1834      LOG.warn("CloseRegionRequest for {} does not have a start code", request.getRegion());
1835      return;
1836    }
1837    throwOnWrongStartCode(request.getServerStartCode());
1838  }
1839
1840  private void throwOnWrongStartCode(long serverStartCode) throws ServiceException {
1841    // check that we are the same server that this RPC is intended for.
1842    if (server.getServerName().getStartcode() != serverStartCode) {
1843      throw new ServiceException(new DoNotRetryIOException(
1844        "This RPC was intended for a " + "different server with startCode: " + serverStartCode
1845          + ", this server is: " + server.getServerName()));
1846    }
1847  }
1848
1849  private void throwOnWrongStartCode(ExecuteProceduresRequest req) throws ServiceException {
1850    if (req.getOpenRegionCount() > 0) {
1851      for (OpenRegionRequest openReq : req.getOpenRegionList()) {
1852        throwOnWrongStartCode(openReq);
1853      }
1854    }
1855    if (req.getCloseRegionCount() > 0) {
1856      for (CloseRegionRequest closeReq : req.getCloseRegionList()) {
1857        throwOnWrongStartCode(closeReq);
1858      }
1859    }
1860  }
1861
1862  /**
1863   * Open asynchronously a region or a set of regions on the region server. The opening is
1864   * coordinated by ZooKeeper, and this method requires the znode to be created before being called.
1865   * As a consequence, this method should be called only from the master.
1866   * <p>
1867   * Different manages states for the region are:
1868   * </p>
1869   * <ul>
1870   * <li>region not opened: the region opening will start asynchronously.</li>
1871   * <li>a close is already in progress: this is considered as an error.</li>
1872   * <li>an open is already in progress: this new open request will be ignored. This is important
1873   * because the Master can do multiple requests if it crashes.</li>
1874   * <li>the region is already opened: this new open request will be ignored.</li>
1875   * </ul>
1876   * <p>
1877   * Bulk assign: If there are more than 1 region to open, it will be considered as a bulk assign.
1878   * For a single region opening, errors are sent through a ServiceException. For bulk assign,
1879   * errors are put in the response as FAILED_OPENING.
1880   * </p>
1881   * @param controller the RPC controller
1882   * @param request    the request
1883   */
1884  @Override
1885  @QosPriority(priority = HConstants.ADMIN_QOS)
1886  public OpenRegionResponse openRegion(final RpcController controller,
1887    final OpenRegionRequest request) throws ServiceException {
1888    requestCount.increment();
1889    throwOnWrongStartCode(request);
1890
1891    OpenRegionResponse.Builder builder = OpenRegionResponse.newBuilder();
1892    final int regionCount = request.getOpenInfoCount();
1893    final Map<TableName, TableDescriptor> htds = new HashMap<>(regionCount);
1894    final boolean isBulkAssign = regionCount > 1;
1895    try {
1896      checkOpen();
1897    } catch (IOException ie) {
1898      TableName tableName = null;
1899      if (regionCount == 1) {
1900        org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionInfo ri =
1901          request.getOpenInfo(0).getRegion();
1902        if (ri != null) {
1903          tableName = ProtobufUtil.toTableName(ri.getTableName());
1904        }
1905      }
1906      if (!TableName.META_TABLE_NAME.equals(tableName)) {
1907        throw new ServiceException(ie);
1908      }
1909      // We are assigning meta, wait a little for regionserver to finish initialization.
1910      // Default to quarter of RPC timeout
1911      int timeout = server.getConfiguration().getInt(HConstants.HBASE_RPC_TIMEOUT_KEY,
1912        HConstants.DEFAULT_HBASE_RPC_TIMEOUT) >> 2;
1913      long endTime = EnvironmentEdgeManager.currentTime() + timeout;
1914      synchronized (server.online) {
1915        try {
1916          while (
1917            EnvironmentEdgeManager.currentTime() <= endTime && !server.isStopped()
1918              && !server.isOnline()
1919          ) {
1920            server.online.wait(server.getMsgInterval());
1921          }
1922          checkOpen();
1923        } catch (InterruptedException t) {
1924          Thread.currentThread().interrupt();
1925          throw new ServiceException(t);
1926        } catch (IOException e) {
1927          throw new ServiceException(e);
1928        }
1929      }
1930    }
1931
1932    long masterSystemTime = request.hasMasterSystemTime() ? request.getMasterSystemTime() : -1;
1933
1934    for (RegionOpenInfo regionOpenInfo : request.getOpenInfoList()) {
1935      final RegionInfo region = ProtobufUtil.toRegionInfo(regionOpenInfo.getRegion());
1936      TableDescriptor htd;
1937      try {
1938        String encodedName = region.getEncodedName();
1939        byte[] encodedNameBytes = region.getEncodedNameAsBytes();
1940        final HRegion onlineRegion = server.getRegion(encodedName);
1941        if (onlineRegion != null) {
1942          // The region is already online. This should not happen any more.
1943          String error = "Received OPEN for the region:" + region.getRegionNameAsString()
1944            + ", which is already online";
1945          LOG.warn(error);
1946          // server.abort(error);
1947          // throw new IOException(error);
1948          builder.addOpeningState(RegionOpeningState.OPENED);
1949          continue;
1950        }
1951        LOG.info("Open " + region.getRegionNameAsString());
1952
1953        final Boolean previous =
1954          server.getRegionsInTransitionInRS().putIfAbsent(encodedNameBytes, Boolean.TRUE);
1955
1956        if (Boolean.FALSE.equals(previous)) {
1957          if (server.getRegion(encodedName) != null) {
1958            // There is a close in progress. This should not happen any more.
1959            String error = "Received OPEN for the region:" + region.getRegionNameAsString()
1960              + ", which we are already trying to CLOSE";
1961            server.abort(error);
1962            throw new IOException(error);
1963          }
1964          server.getRegionsInTransitionInRS().put(encodedNameBytes, Boolean.TRUE);
1965        }
1966
1967        if (Boolean.TRUE.equals(previous)) {
1968          // An open is in progress. This is supported, but let's log this.
1969          LOG.info("Receiving OPEN for the region:" + region.getRegionNameAsString()
1970            + ", which we are already trying to OPEN"
1971            + " - ignoring this new request for this region.");
1972        }
1973
1974        // We are opening this region. If it moves back and forth for whatever reason, we don't
1975        // want to keep returning the stale moved record while we are opening/if we close again.
1976        server.removeFromMovedRegions(region.getEncodedName());
1977
1978        if (previous == null || !previous.booleanValue()) {
1979          htd = htds.get(region.getTable());
1980          if (htd == null) {
1981            htd = server.getTableDescriptors().get(region.getTable());
1982            htds.put(region.getTable(), htd);
1983          }
1984          if (htd == null) {
1985            throw new IOException("Missing table descriptor for " + region.getEncodedName());
1986          }
1987          // If there is no action in progress, we can submit a specific handler.
1988          // Need to pass the expected version in the constructor.
1989          if (server.getExecutorService() == null) {
1990            LOG.info("No executor executorService; skipping open request");
1991          } else {
1992            if (region.isMetaRegion()) {
1993              server.getExecutorService()
1994                .submit(new OpenMetaHandler(server, server, region, htd, masterSystemTime));
1995            } else {
1996              if (regionOpenInfo.getFavoredNodesCount() > 0) {
1997                server.updateRegionFavoredNodesMapping(region.getEncodedName(),
1998                  regionOpenInfo.getFavoredNodesList());
1999              }
2000              if (htd.getPriority() >= HConstants.ADMIN_QOS || region.getTable().isSystemTable()) {
2001                server.getExecutorService().submit(
2002                  new OpenPriorityRegionHandler(server, server, region, htd, masterSystemTime));
2003              } else {
2004                server.getExecutorService()
2005                  .submit(new OpenRegionHandler(server, server, region, htd, masterSystemTime));
2006              }
2007            }
2008          }
2009        }
2010
2011        builder.addOpeningState(RegionOpeningState.OPENED);
2012      } catch (IOException ie) {
2013        LOG.warn("Failed opening region " + region.getRegionNameAsString(), ie);
2014        if (isBulkAssign) {
2015          builder.addOpeningState(RegionOpeningState.FAILED_OPENING);
2016        } else {
2017          throw new ServiceException(ie);
2018        }
2019      }
2020    }
2021    return builder.build();
2022  }
2023
2024  /**
2025   * Warmup a region on this server. This method should only be called by Master. It synchronously
2026   * opens the region and closes the region bringing the most important pages in cache.
2027   */
2028  @Override
2029  public WarmupRegionResponse warmupRegion(final RpcController controller,
2030    final WarmupRegionRequest request) throws ServiceException {
2031    final RegionInfo region = ProtobufUtil.toRegionInfo(request.getRegionInfo());
2032    WarmupRegionResponse response = WarmupRegionResponse.getDefaultInstance();
2033    try {
2034      checkOpen();
2035      String encodedName = region.getEncodedName();
2036      byte[] encodedNameBytes = region.getEncodedNameAsBytes();
2037      final HRegion onlineRegion = server.getRegion(encodedName);
2038      if (onlineRegion != null) {
2039        LOG.info("{} is online; skipping warmup", region);
2040        return response;
2041      }
2042      TableDescriptor htd = server.getTableDescriptors().get(region.getTable());
2043      if (server.getRegionsInTransitionInRS().containsKey(encodedNameBytes)) {
2044        LOG.info("{} is in transition; skipping warmup", region);
2045        return response;
2046      }
2047      LOG.info("Warmup {}", region.getRegionNameAsString());
2048      HRegion.warmupHRegion(region, htd, server.getWAL(region), server.getConfiguration(), server,
2049        null);
2050    } catch (IOException ie) {
2051      LOG.error("Failed warmup of {}", region.getRegionNameAsString(), ie);
2052      throw new ServiceException(ie);
2053    }
2054
2055    return response;
2056  }
2057
2058  private ExtendedCellScanner getAndReset(RpcController controller) {
2059    HBaseRpcController hrc = (HBaseRpcController) controller;
2060    ExtendedCellScanner cells = hrc.cellScanner();
2061    hrc.setCellScanner(null);
2062    return cells;
2063  }
2064
2065  /**
2066   * Replay the given changes when distributedLogReplay WAL edits from a failed RS. The guarantee is
2067   * that the given mutations will be durable on the receiving RS if this method returns without any
2068   * exception.
2069   * @param controller the RPC controller
2070   * @param request    the request
2071   * @deprecated Since 3.0.0, will be removed in 4.0.0. Not used any more, put here only for
2072   *             compatibility with old region replica implementation. Now we will use
2073   *             {@code replicateToReplica} method instead.
2074   */
2075  @Deprecated
2076  @Override
2077  @QosPriority(priority = HConstants.REPLAY_QOS)
2078  public ReplicateWALEntryResponse replay(final RpcController controller,
2079    final ReplicateWALEntryRequest request) throws ServiceException {
2080    long before = EnvironmentEdgeManager.currentTime();
2081    CellScanner cells = getAndReset(controller);
2082    try {
2083      checkOpen();
2084      List<WALEntry> entries = request.getEntryList();
2085      if (entries == null || entries.isEmpty()) {
2086        // empty input
2087        return ReplicateWALEntryResponse.newBuilder().build();
2088      }
2089      ByteString regionName = entries.get(0).getKey().getEncodedRegionName();
2090      HRegion region = server.getRegionByEncodedName(regionName.toStringUtf8());
2091      RegionCoprocessorHost coprocessorHost =
2092        ServerRegionReplicaUtil.isDefaultReplica(region.getRegionInfo())
2093          ? region.getCoprocessorHost()
2094          : null; // do not invoke coprocessors if this is a secondary region replica
2095
2096      // Skip adding the edits to WAL if this is a secondary region replica
2097      boolean isPrimary = RegionReplicaUtil.isDefaultReplica(region.getRegionInfo());
2098      Durability durability = isPrimary ? Durability.USE_DEFAULT : Durability.SKIP_WAL;
2099
2100      for (WALEntry entry : entries) {
2101        if (!regionName.equals(entry.getKey().getEncodedRegionName())) {
2102          throw new NotServingRegionException("Replay request contains entries from multiple "
2103            + "regions. First region:" + regionName.toStringUtf8() + " , other region:"
2104            + entry.getKey().getEncodedRegionName());
2105        }
2106        if (server.nonceManager != null && isPrimary) {
2107          long nonceGroup =
2108            entry.getKey().hasNonceGroup() ? entry.getKey().getNonceGroup() : HConstants.NO_NONCE;
2109          long nonce = entry.getKey().hasNonce() ? entry.getKey().getNonce() : HConstants.NO_NONCE;
2110          server.nonceManager.reportOperationFromWal(nonceGroup, nonce,
2111            entry.getKey().getWriteTime());
2112        }
2113        Pair<WALKey, WALEdit> walEntry = (coprocessorHost == null) ? null : new Pair<>();
2114        List<MutationReplay> edits =
2115          WALSplitUtil.getMutationsFromWALEntry(entry, cells, walEntry, durability);
2116        if (edits != null && !edits.isEmpty()) {
2117          // HBASE-17924
2118          // sort to improve lock efficiency
2119          Collections.sort(edits, (v1, v2) -> Row.COMPARATOR.compare(v1.mutation, v2.mutation));
2120          long replaySeqId = (entry.getKey().hasOrigSequenceNumber())
2121            ? entry.getKey().getOrigSequenceNumber()
2122            : entry.getKey().getLogSequenceNumber();
2123          OperationStatus[] result = doReplayBatchOp(region, edits, replaySeqId);
2124          // check if it's a partial success
2125          for (int i = 0; result != null && i < result.length; i++) {
2126            if (result[i] != OperationStatus.SUCCESS) {
2127              throw new IOException(result[i].getExceptionMsg());
2128            }
2129          }
2130        }
2131      }
2132
2133      // sync wal at the end because ASYNC_WAL is used above
2134      WAL wal = region.getWAL();
2135      if (wal != null) {
2136        wal.sync();
2137      }
2138      return ReplicateWALEntryResponse.newBuilder().build();
2139    } catch (IOException ie) {
2140      throw new ServiceException(ie);
2141    } finally {
2142      final MetricsRegionServer metricsRegionServer = server.getMetrics();
2143      if (metricsRegionServer != null) {
2144        metricsRegionServer.updateReplay(EnvironmentEdgeManager.currentTime() - before);
2145      }
2146    }
2147  }
2148
2149  /**
2150   * Replay the given changes on a secondary replica
2151   */
2152  @Override
2153  public ReplicateWALEntryResponse replicateToReplica(RpcController controller,
2154    ReplicateWALEntryRequest request) throws ServiceException {
2155    CellScanner cells = getAndReset(controller);
2156    try {
2157      checkOpen();
2158      List<WALEntry> entries = request.getEntryList();
2159      if (entries == null || entries.isEmpty()) {
2160        // empty input
2161        return ReplicateWALEntryResponse.newBuilder().build();
2162      }
2163      ByteString regionName = entries.get(0).getKey().getEncodedRegionName();
2164      HRegion region = server.getRegionByEncodedName(regionName.toStringUtf8());
2165      if (RegionReplicaUtil.isDefaultReplica(region.getRegionInfo())) {
2166        throw new DoNotRetryIOException(
2167          "Should not replicate to primary replica " + region.getRegionInfo() + ", CODE BUG?");
2168      }
2169      for (WALEntry entry : entries) {
2170        if (!regionName.equals(entry.getKey().getEncodedRegionName())) {
2171          throw new NotServingRegionException(
2172            "ReplicateToReplica request contains entries from multiple " + "regions. First region:"
2173              + regionName.toStringUtf8() + " , other region:"
2174              + entry.getKey().getEncodedRegionName());
2175        }
2176        region.replayWALEntry(entry, cells);
2177      }
2178      return ReplicateWALEntryResponse.newBuilder().build();
2179    } catch (IOException ie) {
2180      throw new ServiceException(ie);
2181    }
2182  }
2183
2184  private void checkShouldRejectReplicationRequest(List<WALEntry> entries) throws IOException {
2185    ReplicationSourceService replicationSource = server.getReplicationSourceService();
2186    if (replicationSource == null || entries.isEmpty()) {
2187      return;
2188    }
2189    // We can ensure that all entries are for one peer, so only need to check one entry's
2190    // table name. if the table hit sync replication at peer side and the peer cluster
2191    // is (or is transiting to) state ACTIVE or DOWNGRADE_ACTIVE, we should reject to apply
2192    // those entries according to the design doc.
2193    TableName table = TableName.valueOf(entries.get(0).getKey().getTableName().toByteArray());
2194    if (
2195      replicationSource.getSyncReplicationPeerInfoProvider().checkState(table,
2196        RejectReplicationRequestStateChecker.get())
2197    ) {
2198      throw new DoNotRetryIOException(
2199        "Reject to apply to sink cluster because sync replication state of sink cluster "
2200          + "is ACTIVE or DOWNGRADE_ACTIVE, table: " + table);
2201    }
2202  }
2203
2204  /**
2205   * Replicate WAL entries on the region server.
2206   * @param controller the RPC controller
2207   * @param request    the request
2208   */
2209  @Override
2210  @QosPriority(priority = HConstants.REPLICATION_QOS)
2211  public ReplicateWALEntryResponse replicateWALEntry(final RpcController controller,
2212    final ReplicateWALEntryRequest request) throws ServiceException {
2213    try {
2214      checkOpen();
2215      if (server.getReplicationSinkService() != null) {
2216        requestCount.increment();
2217        List<WALEntry> entries = request.getEntryList();
2218        checkShouldRejectReplicationRequest(entries);
2219        CellScanner cellScanner = getAndReset(controller);
2220        server.getRegionServerCoprocessorHost().preReplicateLogEntries();
2221        server.getReplicationSinkService().replicateLogEntries(entries, cellScanner,
2222          request.getReplicationClusterId(), request.getSourceBaseNamespaceDirPath(),
2223          request.getSourceHFileArchiveDirPath());
2224        server.getRegionServerCoprocessorHost().postReplicateLogEntries();
2225        return ReplicateWALEntryResponse.newBuilder().build();
2226      } else {
2227        throw new ServiceException("Replication services are not initialized yet");
2228      }
2229    } catch (IOException ie) {
2230      throw new ServiceException(ie);
2231    }
2232  }
2233
2234  /**
2235   * Roll the WAL writer of the region server.
2236   * @param controller the RPC controller
2237   * @param request    the request
2238   */
2239  @Override
2240  @QosPriority(priority = HConstants.ADMIN_QOS)
2241  public RollWALWriterResponse rollWALWriter(final RpcController controller,
2242    final RollWALWriterRequest request) throws ServiceException {
2243    try {
2244      checkOpen();
2245      requestCount.increment();
2246      server.getRegionServerCoprocessorHost().preRollWALWriterRequest();
2247      server.getWalRoller().requestRollAll();
2248      server.getRegionServerCoprocessorHost().postRollWALWriterRequest();
2249      RollWALWriterResponse.Builder builder = RollWALWriterResponse.newBuilder();
2250      return builder.build();
2251    } catch (IOException ie) {
2252      throw new ServiceException(ie);
2253    }
2254  }
2255
2256  /**
2257   * Stop the region server.
2258   * @param controller the RPC controller
2259   * @param request    the request
2260   */
2261  @Override
2262  @QosPriority(priority = HConstants.ADMIN_QOS)
2263  public StopServerResponse stopServer(final RpcController controller,
2264    final StopServerRequest request) throws ServiceException {
2265    rpcPreCheck("stopServer");
2266    requestCount.increment();
2267    String reason = request.getReason();
2268    server.stop(reason);
2269    return StopServerResponse.newBuilder().build();
2270  }
2271
2272  @Override
2273  public UpdateFavoredNodesResponse updateFavoredNodes(RpcController controller,
2274    UpdateFavoredNodesRequest request) throws ServiceException {
2275    rpcPreCheck("updateFavoredNodes");
2276    List<UpdateFavoredNodesRequest.RegionUpdateInfo> openInfoList = request.getUpdateInfoList();
2277    UpdateFavoredNodesResponse.Builder respBuilder = UpdateFavoredNodesResponse.newBuilder();
2278    for (UpdateFavoredNodesRequest.RegionUpdateInfo regionUpdateInfo : openInfoList) {
2279      RegionInfo hri = ProtobufUtil.toRegionInfo(regionUpdateInfo.getRegion());
2280      if (regionUpdateInfo.getFavoredNodesCount() > 0) {
2281        server.updateRegionFavoredNodesMapping(hri.getEncodedName(),
2282          regionUpdateInfo.getFavoredNodesList());
2283      }
2284    }
2285    respBuilder.setResponse(openInfoList.size());
2286    return respBuilder.build();
2287  }
2288
2289  /**
2290   * Atomically bulk load several HFiles into an open region
2291   * @return true if successful, false is failed but recoverably (no action)
2292   * @throws ServiceException if failed unrecoverably
2293   */
2294  @Override
2295  public BulkLoadHFileResponse bulkLoadHFile(final RpcController controller,
2296    final BulkLoadHFileRequest request) throws ServiceException {
2297    long start = EnvironmentEdgeManager.currentTime();
2298    List<String> clusterIds = new ArrayList<>(request.getClusterIdsList());
2299    if (clusterIds.contains(this.server.getClusterId())) {
2300      return BulkLoadHFileResponse.newBuilder().setLoaded(true).build();
2301    } else {
2302      clusterIds.add(this.server.getClusterId());
2303    }
2304    try {
2305      checkOpen();
2306      requestCount.increment();
2307      HRegion region = getRegion(request.getRegion());
2308      final boolean spaceQuotaEnabled = QuotaUtil.isQuotaEnabled(getConfiguration());
2309      long sizeToBeLoaded = -1;
2310
2311      // Check to see if this bulk load would exceed the space quota for this table
2312      if (spaceQuotaEnabled) {
2313        ActivePolicyEnforcement activeSpaceQuotas = getSpaceQuotaManager().getActiveEnforcements();
2314        SpaceViolationPolicyEnforcement enforcement =
2315          activeSpaceQuotas.getPolicyEnforcement(region);
2316        if (enforcement != null) {
2317          // Bulk loads must still be atomic. We must enact all or none.
2318          List<String> filePaths = new ArrayList<>(request.getFamilyPathCount());
2319          for (FamilyPath familyPath : request.getFamilyPathList()) {
2320            filePaths.add(familyPath.getPath());
2321          }
2322          // Check if the batch of files exceeds the current quota
2323          sizeToBeLoaded = enforcement.computeBulkLoadSize(getFileSystem(filePaths), filePaths);
2324        }
2325      }
2326      // secure bulk load
2327      Map<byte[], List<Path>> map =
2328        server.getSecureBulkLoadManager().secureBulkLoadHFiles(region, request, clusterIds);
2329      BulkLoadHFileResponse.Builder builder = BulkLoadHFileResponse.newBuilder();
2330      builder.setLoaded(map != null);
2331      if (map != null) {
2332        // Treat any negative size as a flag to "ignore" updating the region size as that is
2333        // not possible to occur in real life (cannot bulk load a file with negative size)
2334        if (spaceQuotaEnabled && sizeToBeLoaded > 0) {
2335          if (LOG.isTraceEnabled()) {
2336            LOG.trace("Incrementing space use of " + region.getRegionInfo() + " by "
2337              + sizeToBeLoaded + " bytes");
2338          }
2339          // Inform space quotas of the new files for this region
2340          getSpaceQuotaManager().getRegionSizeStore().incrementRegionSize(region.getRegionInfo(),
2341            sizeToBeLoaded);
2342        }
2343      }
2344      return builder.build();
2345    } catch (IOException ie) {
2346      throw new ServiceException(ie);
2347    } finally {
2348      final MetricsRegionServer metricsRegionServer = server.getMetrics();
2349      if (metricsRegionServer != null) {
2350        metricsRegionServer.updateBulkLoad(EnvironmentEdgeManager.currentTime() - start);
2351      }
2352    }
2353  }
2354
2355  @Override
2356  public PrepareBulkLoadResponse prepareBulkLoad(RpcController controller,
2357    PrepareBulkLoadRequest request) throws ServiceException {
2358    try {
2359      checkOpen();
2360      requestCount.increment();
2361
2362      HRegion region = getRegion(request.getRegion());
2363
2364      String bulkToken = server.getSecureBulkLoadManager().prepareBulkLoad(region, request);
2365      PrepareBulkLoadResponse.Builder builder = PrepareBulkLoadResponse.newBuilder();
2366      builder.setBulkToken(bulkToken);
2367      return builder.build();
2368    } catch (IOException ie) {
2369      throw new ServiceException(ie);
2370    }
2371  }
2372
2373  @Override
2374  public CleanupBulkLoadResponse cleanupBulkLoad(RpcController controller,
2375    CleanupBulkLoadRequest request) throws ServiceException {
2376    try {
2377      checkOpen();
2378      requestCount.increment();
2379
2380      HRegion region = getRegion(request.getRegion());
2381
2382      server.getSecureBulkLoadManager().cleanupBulkLoad(region, request);
2383      return CleanupBulkLoadResponse.newBuilder().build();
2384    } catch (IOException ie) {
2385      throw new ServiceException(ie);
2386    }
2387  }
2388
2389  @Override
2390  public CoprocessorServiceResponse execService(final RpcController controller,
2391    final CoprocessorServiceRequest request) throws ServiceException {
2392    try {
2393      checkOpen();
2394      requestCount.increment();
2395      HRegion region = getRegion(request.getRegion());
2396      Message result = execServiceOnRegion(region, request.getCall());
2397      CoprocessorServiceResponse.Builder builder = CoprocessorServiceResponse.newBuilder();
2398      builder.setRegion(RequestConverter.buildRegionSpecifier(RegionSpecifierType.REGION_NAME,
2399        region.getRegionInfo().getRegionName()));
2400      // TODO: COPIES!!!!!!
2401      builder.setValue(builder.getValueBuilder().setName(result.getClass().getName()).setValue(
2402        org.apache.hbase.thirdparty.com.google.protobuf.ByteString.copyFrom(result.toByteArray())));
2403      return builder.build();
2404    } catch (IOException ie) {
2405      throw new ServiceException(ie);
2406    }
2407  }
2408
2409  private FileSystem getFileSystem(List<String> filePaths) throws IOException {
2410    if (filePaths.isEmpty()) {
2411      // local hdfs
2412      return server.getFileSystem();
2413    }
2414    // source hdfs
2415    return new Path(filePaths.get(0)).getFileSystem(server.getConfiguration());
2416  }
2417
2418  private Message execServiceOnRegion(HRegion region,
2419    final ClientProtos.CoprocessorServiceCall serviceCall) throws IOException {
2420    // ignore the passed in controller (from the serialized call)
2421    ServerRpcController execController = new ServerRpcController();
2422    return region.execService(execController, serviceCall);
2423  }
2424
2425  private boolean shouldRejectRequestsFromClient(HRegion region) {
2426    TableName table = region.getRegionInfo().getTable();
2427    ReplicationSourceService service = server.getReplicationSourceService();
2428    return service != null && service.getSyncReplicationPeerInfoProvider().checkState(table,
2429      RejectRequestsFromClientStateChecker.get());
2430  }
2431
2432  private void rejectIfInStandByState(HRegion region) throws DoNotRetryIOException {
2433    if (shouldRejectRequestsFromClient(region)) {
2434      throw new DoNotRetryIOException(
2435        region.getRegionInfo().getRegionNameAsString() + " is in STANDBY state.");
2436    }
2437  }
2438
2439  /**
2440   * Get data from a table.
2441   * @param controller the RPC controller
2442   * @param request    the get request
2443   */
2444  @Override
2445  public GetResponse get(final RpcController controller, final GetRequest request)
2446    throws ServiceException {
2447    long before = EnvironmentEdgeManager.currentTime();
2448    OperationQuota quota = null;
2449    HRegion region = null;
2450    RpcCallContext context = RpcServer.getCurrentCall().orElse(null);
2451    try {
2452      checkOpen();
2453      requestCount.increment();
2454      rpcGetRequestCount.increment();
2455      region = getRegion(request.getRegion());
2456      rejectIfInStandByState(region);
2457
2458      GetResponse.Builder builder = GetResponse.newBuilder();
2459      ClientProtos.Get get = request.getGet();
2460      // An asynchbase client, https://github.com/OpenTSDB/asynchbase, starts by trying to do
2461      // a get closest before. Throwing the UnknownProtocolException signals it that it needs
2462      // to switch and do hbase2 protocol (HBase servers do not tell clients what versions
2463      // they are; its a problem for non-native clients like asynchbase. HBASE-20225.
2464      if (get.hasClosestRowBefore() && get.getClosestRowBefore()) {
2465        throw new UnknownProtocolException("Is this a pre-hbase-1.0.0 or asynchbase client? "
2466          + "Client is invoking getClosestRowBefore removed in hbase-2.0.0 replaced by "
2467          + "reverse Scan.");
2468      }
2469      Boolean existence = null;
2470      Result r = null;
2471      quota = getRpcQuotaManager().checkBatchQuota(region, OperationQuota.OperationType.GET);
2472
2473      Get clientGet = ProtobufUtil.toGet(get);
2474      if (get.getExistenceOnly() && region.getCoprocessorHost() != null) {
2475        existence = region.getCoprocessorHost().preExists(clientGet);
2476      }
2477      if (existence == null) {
2478        if (context != null) {
2479          r = get(clientGet, (region), null, context);
2480        } else {
2481          // for test purpose
2482          r = region.get(clientGet);
2483        }
2484        if (get.getExistenceOnly()) {
2485          boolean exists = r.getExists();
2486          if (region.getCoprocessorHost() != null) {
2487            exists = region.getCoprocessorHost().postExists(clientGet, exists);
2488          }
2489          existence = exists;
2490        }
2491      }
2492      if (existence != null) {
2493        ClientProtos.Result pbr =
2494          ProtobufUtil.toResult(existence, region.getRegionInfo().getReplicaId() != 0);
2495        builder.setResult(pbr);
2496      } else if (r != null) {
2497        ClientProtos.Result pbr;
2498        if (
2499          isClientCellBlockSupport(context) && controller instanceof HBaseRpcController
2500            && VersionInfoUtil.hasMinimumVersion(context.getClientVersionInfo(), 1, 3)
2501        ) {
2502          pbr = ProtobufUtil.toResultNoData(r);
2503          ((HBaseRpcController) controller).setCellScanner(PrivateCellUtil
2504            .createExtendedCellScanner(PackagePrivateFieldAccessor.getExtendedRawCells(r)));
2505          addSize(context, r);
2506        } else {
2507          pbr = ProtobufUtil.toResult(r);
2508        }
2509        builder.setResult(pbr);
2510      }
2511      // r.cells is null when an table.exists(get) call
2512      if (r != null && r.rawCells() != null) {
2513        quota.addGetResult(r);
2514      }
2515      return builder.build();
2516    } catch (IOException ie) {
2517      throw new ServiceException(ie);
2518    } finally {
2519      final MetricsRegionServer metricsRegionServer = server.getMetrics();
2520      if (metricsRegionServer != null && region != null) {
2521        long blockBytesScanned = context != null ? context.getBlockBytesScanned() : 0;
2522        metricsRegionServer.updateGet(region, EnvironmentEdgeManager.currentTime() - before,
2523          blockBytesScanned);
2524      }
2525      if (quota != null) {
2526        quota.close();
2527      }
2528    }
2529  }
2530
2531  private Result get(Get get, HRegion region, RegionScannersCloseCallBack closeCallBack,
2532    RpcCallContext context) throws IOException {
2533    region.prepareGet(get);
2534    boolean stale = region.getRegionInfo().getReplicaId() != 0;
2535
2536    // This method is almost the same as HRegion#get.
2537    List<Cell> results = new ArrayList<>();
2538    // pre-get CP hook
2539    if (region.getCoprocessorHost() != null) {
2540      if (region.getCoprocessorHost().preGet(get, results)) {
2541        region.metricsUpdateForGet();
2542        return Result.create(results, get.isCheckExistenceOnly() ? !results.isEmpty() : null,
2543          stale);
2544      }
2545    }
2546    Scan scan = new Scan(get);
2547    if (scan.getLoadColumnFamiliesOnDemandValue() == null) {
2548      scan.setLoadColumnFamiliesOnDemand(region.isLoadingCfsOnDemandDefault());
2549    }
2550    RegionScannerImpl scanner = null;
2551    try {
2552      scanner = region.getScanner(scan);
2553      scanner.next(results);
2554    } finally {
2555      if (scanner != null) {
2556        if (closeCallBack == null) {
2557          // If there is a context then the scanner can be added to the current
2558          // RpcCallContext. The rpc callback will take care of closing the
2559          // scanner, for eg in case
2560          // of get()
2561          context.setCallBack(scanner);
2562        } else {
2563          // The call is from multi() where the results from the get() are
2564          // aggregated and then send out to the
2565          // rpc. The rpccall back will close all such scanners created as part
2566          // of multi().
2567          closeCallBack.addScanner(scanner);
2568        }
2569      }
2570    }
2571
2572    // post-get CP hook
2573    if (region.getCoprocessorHost() != null) {
2574      region.getCoprocessorHost().postGet(get, results);
2575    }
2576    region.metricsUpdateForGet();
2577
2578    return Result.create(results, get.isCheckExistenceOnly() ? !results.isEmpty() : null, stale);
2579  }
2580
2581  private void checkBatchSizeAndLogLargeSize(MultiRequest request) throws ServiceException {
2582    int sum = 0;
2583    String firstRegionName = null;
2584    for (RegionAction regionAction : request.getRegionActionList()) {
2585      if (sum == 0) {
2586        firstRegionName = Bytes.toStringBinary(regionAction.getRegion().getValue().toByteArray());
2587      }
2588      sum += regionAction.getActionCount();
2589    }
2590    if (sum > rowSizeWarnThreshold) {
2591      LOG.warn("Large batch operation detected (greater than " + rowSizeWarnThreshold
2592        + ") (HBASE-18023)." + " Requested Number of Rows: " + sum + " Client: "
2593        + RpcServer.getRequestUserName().orElse(null) + "/"
2594        + RpcServer.getRemoteAddress().orElse(null) + " first region in multi=" + firstRegionName);
2595      if (rejectRowsWithSizeOverThreshold) {
2596        throw new ServiceException(
2597          "Rejecting large batch operation for current batch with firstRegionName: "
2598            + firstRegionName + " , Requested Number of Rows: " + sum + " , Size Threshold: "
2599            + rowSizeWarnThreshold);
2600      }
2601    }
2602  }
2603
2604  private void failRegionAction(MultiResponse.Builder responseBuilder,
2605    RegionActionResult.Builder regionActionResultBuilder, RegionAction regionAction,
2606    CellScanner cellScanner, Throwable error) {
2607    rpcServer.getMetrics().exception(error);
2608    regionActionResultBuilder.setException(ResponseConverter.buildException(error));
2609    responseBuilder.addRegionActionResult(regionActionResultBuilder.build());
2610    // All Mutations in this RegionAction not executed as we can not see the Region online here
2611    // in this RS. Will be retried from Client. Skipping all the Cells in CellScanner
2612    // corresponding to these Mutations.
2613    if (cellScanner != null) {
2614      skipCellsForMutations(regionAction.getActionList(), cellScanner);
2615    }
2616  }
2617
2618  private boolean isReplicationRequest(Action action) {
2619    // replication request can only be put or delete.
2620    if (!action.hasMutation()) {
2621      return false;
2622    }
2623    MutationProto mutation = action.getMutation();
2624    MutationType type = mutation.getMutateType();
2625    if (type != MutationType.PUT && type != MutationType.DELETE) {
2626      return false;
2627    }
2628    // replication will set a special attribute so we can make use of it to decide whether a request
2629    // is for replication.
2630    return mutation.getAttributeList().stream().map(p -> p.getName())
2631      .filter(n -> n.equals(ReplicationUtils.REPLICATION_ATTR_NAME)).findAny().isPresent();
2632  }
2633
2634  /**
2635   * Execute multiple actions on a table: get, mutate, and/or execCoprocessor
2636   * @param rpcc    the RPC controller
2637   * @param request the multi request
2638   */
2639  @Override
2640  public MultiResponse multi(final RpcController rpcc, final MultiRequest request)
2641    throws ServiceException {
2642    try {
2643      checkOpen();
2644    } catch (IOException ie) {
2645      throw new ServiceException(ie);
2646    }
2647
2648    checkBatchSizeAndLogLargeSize(request);
2649
2650    // rpc controller is how we bring in data via the back door; it is unprotobuf'ed data.
2651    // It is also the conduit via which we pass back data.
2652    HBaseRpcController controller = (HBaseRpcController) rpcc;
2653    CellScanner cellScanner = controller != null ? getAndReset(controller) : null;
2654
2655    long nonceGroup = request.hasNonceGroup() ? request.getNonceGroup() : HConstants.NO_NONCE;
2656
2657    MultiResponse.Builder responseBuilder = MultiResponse.newBuilder();
2658    RegionActionResult.Builder regionActionResultBuilder = RegionActionResult.newBuilder();
2659    this.rpcMultiRequestCount.increment();
2660    this.requestCount.increment();
2661    ActivePolicyEnforcement spaceQuotaEnforcement = getSpaceQuotaManager().getActiveEnforcements();
2662
2663    // We no longer use MultiRequest#condition. Instead, we use RegionAction#condition. The
2664    // following logic is for backward compatibility as old clients still use
2665    // MultiRequest#condition in case of checkAndMutate with RowMutations.
2666    if (request.hasCondition()) {
2667      if (request.getRegionActionList().isEmpty()) {
2668        // If the region action list is empty, do nothing.
2669        responseBuilder.setProcessed(true);
2670        return responseBuilder.build();
2671      }
2672
2673      RegionAction regionAction = request.getRegionAction(0);
2674
2675      // When request.hasCondition() is true, regionAction.getAtomic() should be always true. So
2676      // we can assume regionAction.getAtomic() is true here.
2677      assert regionAction.getAtomic();
2678
2679      OperationQuota quota;
2680      HRegion region;
2681      RegionSpecifier regionSpecifier = regionAction.getRegion();
2682
2683      try {
2684        region = getRegion(regionSpecifier);
2685        quota = getRpcQuotaManager().checkBatchQuota(region, regionAction.getActionList(),
2686          regionAction.hasCondition());
2687      } catch (IOException e) {
2688        failRegionAction(responseBuilder, regionActionResultBuilder, regionAction, cellScanner, e);
2689        return responseBuilder.build();
2690      }
2691
2692      try {
2693        boolean rejectIfFromClient = shouldRejectRequestsFromClient(region);
2694        // We only allow replication in standby state and it will not set the atomic flag.
2695        if (rejectIfFromClient) {
2696          failRegionAction(responseBuilder, regionActionResultBuilder, regionAction, cellScanner,
2697            new DoNotRetryIOException(
2698              region.getRegionInfo().getRegionNameAsString() + " is in STANDBY state"));
2699          return responseBuilder.build();
2700        }
2701
2702        try {
2703          CheckAndMutateResult result = checkAndMutate(region, regionAction.getActionList(),
2704            cellScanner, request.getCondition(), nonceGroup, spaceQuotaEnforcement);
2705          responseBuilder.setProcessed(result.isSuccess());
2706          ClientProtos.ResultOrException.Builder resultOrExceptionOrBuilder =
2707            ClientProtos.ResultOrException.newBuilder();
2708          for (int i = 0; i < regionAction.getActionCount(); i++) {
2709            // To unify the response format with doNonAtomicRegionMutation and read through
2710            // client's AsyncProcess we have to add an empty result instance per operation
2711            resultOrExceptionOrBuilder.clear();
2712            resultOrExceptionOrBuilder.setIndex(i);
2713            regionActionResultBuilder.addResultOrException(resultOrExceptionOrBuilder.build());
2714          }
2715        } catch (IOException e) {
2716          rpcServer.getMetrics().exception(e);
2717          // As it's an atomic operation with a condition, we may expect it's a global failure.
2718          regionActionResultBuilder.setException(ResponseConverter.buildException(e));
2719        }
2720      } finally {
2721        quota.close();
2722      }
2723
2724      responseBuilder.addRegionActionResult(regionActionResultBuilder.build());
2725      ClientProtos.RegionLoadStats regionLoadStats = region.getLoadStatistics();
2726      if (regionLoadStats != null) {
2727        responseBuilder.setRegionStatistics(MultiRegionLoadStats.newBuilder()
2728          .addRegion(regionSpecifier).addStat(regionLoadStats).build());
2729      }
2730      return responseBuilder.build();
2731    }
2732
2733    // this will contain all the cells that we need to return. It's created later, if needed.
2734    List<ExtendedCellScannable> cellsToReturn = null;
2735    RegionScannersCloseCallBack closeCallBack = null;
2736    RpcCallContext context = RpcServer.getCurrentCall().orElse(null);
2737    Map<RegionSpecifier, ClientProtos.RegionLoadStats> regionStats =
2738      new HashMap<>(request.getRegionActionCount());
2739
2740    for (RegionAction regionAction : request.getRegionActionList()) {
2741      OperationQuota quota;
2742      HRegion region;
2743      RegionSpecifier regionSpecifier = regionAction.getRegion();
2744      regionActionResultBuilder.clear();
2745
2746      try {
2747        region = getRegion(regionSpecifier);
2748        quota = getRpcQuotaManager().checkBatchQuota(region, regionAction.getActionList(),
2749          regionAction.hasCondition());
2750      } catch (IOException e) {
2751        failRegionAction(responseBuilder, regionActionResultBuilder, regionAction, cellScanner, e);
2752        continue; // For this region it's a failure.
2753      }
2754
2755      try {
2756        boolean rejectIfFromClient = shouldRejectRequestsFromClient(region);
2757
2758        if (regionAction.hasCondition()) {
2759          // We only allow replication in standby state and it will not set the atomic flag.
2760          if (rejectIfFromClient) {
2761            failRegionAction(responseBuilder, regionActionResultBuilder, regionAction, cellScanner,
2762              new DoNotRetryIOException(
2763                region.getRegionInfo().getRegionNameAsString() + " is in STANDBY state"));
2764            continue;
2765          }
2766
2767          try {
2768            ClientProtos.ResultOrException.Builder resultOrExceptionOrBuilder =
2769              ClientProtos.ResultOrException.newBuilder();
2770            if (regionAction.getActionCount() == 1) {
2771              CheckAndMutateResult result =
2772                checkAndMutate(region, quota, regionAction.getAction(0).getMutation(), cellScanner,
2773                  regionAction.getCondition(), nonceGroup, spaceQuotaEnforcement, context);
2774              regionActionResultBuilder.setProcessed(result.isSuccess());
2775              resultOrExceptionOrBuilder.setIndex(0);
2776              if (result.getResult() != null) {
2777                resultOrExceptionOrBuilder.setResult(ProtobufUtil.toResult(result.getResult()));
2778              }
2779              regionActionResultBuilder.addResultOrException(resultOrExceptionOrBuilder.build());
2780            } else {
2781              CheckAndMutateResult result = checkAndMutate(region, regionAction.getActionList(),
2782                cellScanner, regionAction.getCondition(), nonceGroup, spaceQuotaEnforcement);
2783              regionActionResultBuilder.setProcessed(result.isSuccess());
2784              for (int i = 0; i < regionAction.getActionCount(); i++) {
2785                if (i == 0 && result.getResult() != null) {
2786                  // Set the result of the Increment/Append operations to the first element of the
2787                  // ResultOrException list
2788                  resultOrExceptionOrBuilder.setIndex(i);
2789                  regionActionResultBuilder.addResultOrException(resultOrExceptionOrBuilder
2790                    .setResult(ProtobufUtil.toResult(result.getResult())).build());
2791                  continue;
2792                }
2793                // To unify the response format with doNonAtomicRegionMutation and read through
2794                // client's AsyncProcess we have to add an empty result instance per operation
2795                resultOrExceptionOrBuilder.clear();
2796                resultOrExceptionOrBuilder.setIndex(i);
2797                regionActionResultBuilder.addResultOrException(resultOrExceptionOrBuilder.build());
2798              }
2799            }
2800          } catch (IOException e) {
2801            rpcServer.getMetrics().exception(e);
2802            // As it's an atomic operation with a condition, we may expect it's a global failure.
2803            regionActionResultBuilder.setException(ResponseConverter.buildException(e));
2804          }
2805        } else if (regionAction.hasAtomic() && regionAction.getAtomic()) {
2806          // We only allow replication in standby state and it will not set the atomic flag.
2807          if (rejectIfFromClient) {
2808            failRegionAction(responseBuilder, regionActionResultBuilder, regionAction, cellScanner,
2809              new DoNotRetryIOException(
2810                region.getRegionInfo().getRegionNameAsString() + " is in STANDBY state"));
2811            continue;
2812          }
2813          try {
2814            doAtomicBatchOp(regionActionResultBuilder, region, quota, regionAction.getActionList(),
2815              cellScanner, nonceGroup, spaceQuotaEnforcement);
2816            regionActionResultBuilder.setProcessed(true);
2817            // We no longer use MultiResponse#processed. Instead, we use
2818            // RegionActionResult#processed. This is for backward compatibility for old clients.
2819            responseBuilder.setProcessed(true);
2820          } catch (IOException e) {
2821            rpcServer.getMetrics().exception(e);
2822            // As it's atomic, we may expect it's a global failure.
2823            regionActionResultBuilder.setException(ResponseConverter.buildException(e));
2824          }
2825        } else {
2826          if (
2827            rejectIfFromClient && regionAction.getActionCount() > 0
2828              && !isReplicationRequest(regionAction.getAction(0))
2829          ) {
2830            // fail if it is not a replication request
2831            failRegionAction(responseBuilder, regionActionResultBuilder, regionAction, cellScanner,
2832              new DoNotRetryIOException(
2833                region.getRegionInfo().getRegionNameAsString() + " is in STANDBY state"));
2834            continue;
2835          }
2836          // doNonAtomicRegionMutation manages the exception internally
2837          if (context != null && closeCallBack == null) {
2838            // An RpcCallBack that creates a list of scanners that needs to perform callBack
2839            // operation on completion of multiGets.
2840            // Set this only once
2841            closeCallBack = new RegionScannersCloseCallBack();
2842            context.setCallBack(closeCallBack);
2843          }
2844          cellsToReturn = doNonAtomicRegionMutation(region, quota, regionAction, cellScanner,
2845            regionActionResultBuilder, cellsToReturn, nonceGroup, closeCallBack, context,
2846            spaceQuotaEnforcement);
2847        }
2848      } finally {
2849        quota.close();
2850      }
2851
2852      responseBuilder.addRegionActionResult(regionActionResultBuilder.build());
2853      ClientProtos.RegionLoadStats regionLoadStats = region.getLoadStatistics();
2854      if (regionLoadStats != null) {
2855        regionStats.put(regionSpecifier, regionLoadStats);
2856      }
2857    }
2858    // Load the controller with the Cells to return.
2859    if (cellsToReturn != null && !cellsToReturn.isEmpty() && controller != null) {
2860      controller.setCellScanner(PrivateCellUtil.createExtendedCellScanner(cellsToReturn));
2861    }
2862
2863    MultiRegionLoadStats.Builder builder = MultiRegionLoadStats.newBuilder();
2864    for (Entry<RegionSpecifier, ClientProtos.RegionLoadStats> stat : regionStats.entrySet()) {
2865      builder.addRegion(stat.getKey());
2866      builder.addStat(stat.getValue());
2867    }
2868    responseBuilder.setRegionStatistics(builder);
2869    return responseBuilder.build();
2870  }
2871
2872  private void skipCellsForMutations(List<Action> actions, CellScanner cellScanner) {
2873    if (cellScanner == null) {
2874      return;
2875    }
2876    for (Action action : actions) {
2877      skipCellsForMutation(action, cellScanner);
2878    }
2879  }
2880
2881  private void skipCellsForMutation(Action action, CellScanner cellScanner) {
2882    if (cellScanner == null) {
2883      return;
2884    }
2885    try {
2886      if (action.hasMutation()) {
2887        MutationProto m = action.getMutation();
2888        if (m.hasAssociatedCellCount()) {
2889          for (int i = 0; i < m.getAssociatedCellCount(); i++) {
2890            cellScanner.advance();
2891          }
2892        }
2893      }
2894    } catch (IOException e) {
2895      // No need to handle these Individual Muatation level issue. Any way this entire RegionAction
2896      // marked as failed as we could not see the Region here. At client side the top level
2897      // RegionAction exception will be considered first.
2898      LOG.error("Error while skipping Cells in CellScanner for invalid Region Mutations", e);
2899    }
2900  }
2901
2902  /**
2903   * Mutate data in a table.
2904   * @param rpcc    the RPC controller
2905   * @param request the mutate request
2906   */
2907  @Override
2908  public MutateResponse mutate(final RpcController rpcc, final MutateRequest request)
2909    throws ServiceException {
2910    // rpc controller is how we bring in data via the back door; it is unprotobuf'ed data.
2911    // It is also the conduit via which we pass back data.
2912    HBaseRpcController controller = (HBaseRpcController) rpcc;
2913    CellScanner cellScanner = controller != null ? controller.cellScanner() : null;
2914    OperationQuota quota = null;
2915    RpcCallContext context = RpcServer.getCurrentCall().orElse(null);
2916    // Clear scanner so we are not holding on to reference across call.
2917    if (controller != null) {
2918      controller.setCellScanner(null);
2919    }
2920    try {
2921      checkOpen();
2922      requestCount.increment();
2923      rpcMutateRequestCount.increment();
2924      HRegion region = getRegion(request.getRegion());
2925      rejectIfInStandByState(region);
2926      MutateResponse.Builder builder = MutateResponse.newBuilder();
2927      MutationProto mutation = request.getMutation();
2928      if (!region.getRegionInfo().isMetaRegion()) {
2929        server.getMemStoreFlusher().reclaimMemStoreMemory();
2930      }
2931      long nonceGroup = request.hasNonceGroup() ? request.getNonceGroup() : HConstants.NO_NONCE;
2932      OperationQuota.OperationType operationType = QuotaUtil.getQuotaOperationType(request);
2933      quota = getRpcQuotaManager().checkBatchQuota(region, operationType);
2934      ActivePolicyEnforcement spaceQuotaEnforcement =
2935        getSpaceQuotaManager().getActiveEnforcements();
2936
2937      if (request.hasCondition()) {
2938        CheckAndMutateResult result = checkAndMutate(region, quota, mutation, cellScanner,
2939          request.getCondition(), nonceGroup, spaceQuotaEnforcement, context);
2940        builder.setProcessed(result.isSuccess());
2941        boolean clientCellBlockSupported = isClientCellBlockSupport(context);
2942        addResult(builder, result.getResult(), controller, clientCellBlockSupported);
2943        if (clientCellBlockSupported) {
2944          addSize(context, result.getResult());
2945        }
2946      } else {
2947        Result r = null;
2948        Boolean processed = null;
2949        MutationType type = mutation.getMutateType();
2950        switch (type) {
2951          case APPEND:
2952            // TODO: this doesn't actually check anything.
2953            r = append(region, quota, mutation, cellScanner, nonceGroup, spaceQuotaEnforcement,
2954              context);
2955            break;
2956          case INCREMENT:
2957            // TODO: this doesn't actually check anything.
2958            r = increment(region, quota, mutation, cellScanner, nonceGroup, spaceQuotaEnforcement,
2959              context);
2960            break;
2961          case PUT:
2962            put(region, quota, mutation, cellScanner, spaceQuotaEnforcement);
2963            processed = Boolean.TRUE;
2964            break;
2965          case DELETE:
2966            delete(region, quota, mutation, cellScanner, spaceQuotaEnforcement);
2967            processed = Boolean.TRUE;
2968            break;
2969          default:
2970            throw new DoNotRetryIOException("Unsupported mutate type: " + type.name());
2971        }
2972        if (processed != null) {
2973          builder.setProcessed(processed);
2974        }
2975        boolean clientCellBlockSupported = isClientCellBlockSupport(context);
2976        addResult(builder, r, controller, clientCellBlockSupported);
2977        if (clientCellBlockSupported) {
2978          addSize(context, r);
2979        }
2980      }
2981      return builder.build();
2982    } catch (IOException ie) {
2983      server.checkFileSystem();
2984      throw new ServiceException(ie);
2985    } finally {
2986      if (quota != null) {
2987        quota.close();
2988      }
2989    }
2990  }
2991
2992  private void put(HRegion region, OperationQuota quota, MutationProto mutation,
2993    CellScanner cellScanner, ActivePolicyEnforcement spaceQuota) throws IOException {
2994    long before = EnvironmentEdgeManager.currentTime();
2995    Put put = ProtobufUtil.toPut(mutation, cellScanner);
2996    checkCellSizeLimit(region, put);
2997    spaceQuota.getPolicyEnforcement(region).check(put);
2998    quota.addMutation(put);
2999    region.put(put);
3000
3001    MetricsRegionServer metricsRegionServer = server.getMetrics();
3002    if (metricsRegionServer != null) {
3003      long after = EnvironmentEdgeManager.currentTime();
3004      metricsRegionServer.updatePut(region, after - before);
3005    }
3006  }
3007
3008  private void delete(HRegion region, OperationQuota quota, MutationProto mutation,
3009    CellScanner cellScanner, ActivePolicyEnforcement spaceQuota) throws IOException {
3010    long before = EnvironmentEdgeManager.currentTime();
3011    Delete delete = ProtobufUtil.toDelete(mutation, cellScanner);
3012    checkCellSizeLimit(region, delete);
3013    spaceQuota.getPolicyEnforcement(region).check(delete);
3014    quota.addMutation(delete);
3015    region.delete(delete);
3016
3017    MetricsRegionServer metricsRegionServer = server.getMetrics();
3018    if (metricsRegionServer != null) {
3019      long after = EnvironmentEdgeManager.currentTime();
3020      metricsRegionServer.updateDelete(region, after - before);
3021    }
3022  }
3023
3024  private CheckAndMutateResult checkAndMutate(HRegion region, OperationQuota quota,
3025    MutationProto mutation, CellScanner cellScanner, Condition condition, long nonceGroup,
3026    ActivePolicyEnforcement spaceQuota, RpcCallContext context) throws IOException {
3027    long before = EnvironmentEdgeManager.currentTime();
3028    long blockBytesScannedBefore = context != null ? context.getBlockBytesScanned() : 0;
3029    CheckAndMutate checkAndMutate = ProtobufUtil.toCheckAndMutate(condition, mutation, cellScanner);
3030    long nonce = mutation.hasNonce() ? mutation.getNonce() : HConstants.NO_NONCE;
3031    checkCellSizeLimit(region, (Mutation) checkAndMutate.getAction());
3032    spaceQuota.getPolicyEnforcement(region).check((Mutation) checkAndMutate.getAction());
3033    quota.addMutation((Mutation) checkAndMutate.getAction());
3034
3035    CheckAndMutateResult result = null;
3036    if (region.getCoprocessorHost() != null) {
3037      result = region.getCoprocessorHost().preCheckAndMutate(checkAndMutate);
3038    }
3039    if (result == null) {
3040      result = region.checkAndMutate(checkAndMutate, nonceGroup, nonce);
3041      if (region.getCoprocessorHost() != null) {
3042        result = region.getCoprocessorHost().postCheckAndMutate(checkAndMutate, result);
3043      }
3044    }
3045    MetricsRegionServer metricsRegionServer = server.getMetrics();
3046    if (metricsRegionServer != null) {
3047      long after = EnvironmentEdgeManager.currentTime();
3048      long blockBytesScanned =
3049        context != null ? context.getBlockBytesScanned() - blockBytesScannedBefore : 0;
3050      metricsRegionServer.updateCheckAndMutate(region, after - before, blockBytesScanned);
3051
3052      MutationType type = mutation.getMutateType();
3053      switch (type) {
3054        case PUT:
3055          metricsRegionServer.updateCheckAndPut(region, after - before);
3056          break;
3057        case DELETE:
3058          metricsRegionServer.updateCheckAndDelete(region, after - before);
3059          break;
3060        default:
3061          break;
3062      }
3063    }
3064    return result;
3065  }
3066
3067  // This is used to keep compatible with the old client implementation. Consider remove it if we
3068  // decide to drop the support of the client that still sends close request to a region scanner
3069  // which has already been exhausted.
3070  @Deprecated
3071  private static final IOException SCANNER_ALREADY_CLOSED = new IOException() {
3072
3073    private static final long serialVersionUID = -4305297078988180130L;
3074
3075    @Override
3076    public synchronized Throwable fillInStackTrace() {
3077      return this;
3078    }
3079  };
3080
3081  private RegionScannerHolder getRegionScanner(ScanRequest request) throws IOException {
3082    String scannerName = toScannerName(request.getScannerId());
3083    RegionScannerHolder rsh = this.scanners.get(scannerName);
3084    if (rsh == null) {
3085      // just ignore the next or close request if scanner does not exists.
3086      Long lastCallSeq = closedScanners.getIfPresent(scannerName);
3087      if (lastCallSeq != null) {
3088        // Check the sequence number to catch if the last call was incorrectly retried.
3089        // The only allowed scenario is when the scanner is exhausted and one more scan
3090        // request arrives - in this case returning 0 rows is correct.
3091        if (request.hasNextCallSeq() && request.getNextCallSeq() != lastCallSeq + 1) {
3092          throw new OutOfOrderScannerNextException("Expected nextCallSeq for closed request: "
3093            + (lastCallSeq + 1) + " But the nextCallSeq got from client: "
3094            + request.getNextCallSeq() + "; request=" + TextFormat.shortDebugString(request));
3095        } else {
3096          throw SCANNER_ALREADY_CLOSED;
3097        }
3098      } else {
3099        LOG.warn("Client tried to access missing scanner " + scannerName);
3100        throw new UnknownScannerException(
3101          "Unknown scanner '" + scannerName + "'. This can happen due to any of the following "
3102            + "reasons: a) Scanner id given is wrong, b) Scanner lease expired because of "
3103            + "long wait between consecutive client checkins, c) Server may be closing down, "
3104            + "d) RegionServer restart during upgrade.\nIf the issue is due to reason (b), a "
3105            + "possible fix would be increasing the value of"
3106            + "'hbase.client.scanner.timeout.period' configuration.");
3107      }
3108    }
3109    rejectIfInStandByState(rsh.r);
3110    RegionInfo hri = rsh.s.getRegionInfo();
3111    // Yes, should be the same instance
3112    if (server.getOnlineRegion(hri.getRegionName()) != rsh.r) {
3113      String msg = "Region has changed on the scanner " + scannerName + ": regionName="
3114        + hri.getRegionNameAsString() + ", scannerRegionName=" + rsh.r;
3115      LOG.warn(msg + ", closing...");
3116      scanners.remove(scannerName);
3117      try {
3118        rsh.s.close();
3119      } catch (IOException e) {
3120        LOG.warn("Getting exception closing " + scannerName, e);
3121      } finally {
3122        try {
3123          server.getLeaseManager().cancelLease(scannerName);
3124        } catch (LeaseException e) {
3125          LOG.warn("Getting exception closing " + scannerName, e);
3126        }
3127      }
3128      throw new NotServingRegionException(msg);
3129    }
3130    return rsh;
3131  }
3132
3133  /**
3134   * @return Pair with scannerName key to use with this new Scanner and its RegionScannerHolder
3135   *         value.
3136   */
3137  private Pair<String, RegionScannerHolder> newRegionScanner(ScanRequest request,
3138    ScanResponse.Builder builder) throws IOException {
3139    HRegion region = getRegion(request.getRegion());
3140    rejectIfInStandByState(region);
3141    ClientProtos.Scan protoScan = request.getScan();
3142    boolean isLoadingCfsOnDemandSet = protoScan.hasLoadColumnFamiliesOnDemand();
3143    Scan scan = ProtobufUtil.toScan(protoScan);
3144    // if the request doesn't set this, get the default region setting.
3145    if (!isLoadingCfsOnDemandSet) {
3146      scan.setLoadColumnFamiliesOnDemand(region.isLoadingCfsOnDemandDefault());
3147    }
3148
3149    if (!scan.hasFamilies()) {
3150      // Adding all families to scanner
3151      for (byte[] family : region.getTableDescriptor().getColumnFamilyNames()) {
3152        scan.addFamily(family);
3153      }
3154    }
3155    if (region.getCoprocessorHost() != null) {
3156      // preScannerOpen is not allowed to return a RegionScanner. Only post hook can create a
3157      // wrapper for the core created RegionScanner
3158      region.getCoprocessorHost().preScannerOpen(scan);
3159    }
3160    RegionScannerImpl coreScanner = region.getScanner(scan);
3161    Shipper shipper = coreScanner;
3162    RegionScanner scanner = coreScanner;
3163    try {
3164      if (region.getCoprocessorHost() != null) {
3165        scanner = region.getCoprocessorHost().postScannerOpen(scan, scanner);
3166      }
3167    } catch (Exception e) {
3168      // Although region coprocessor is for advanced users and they should take care of the
3169      // implementation to not damage the HBase system, closing the scanner on exception here does
3170      // not have any bad side effect, so let's do it
3171      scanner.close();
3172      throw e;
3173    }
3174    long scannerId = scannerIdGenerator.generateNewScannerId();
3175    builder.setScannerId(scannerId);
3176    builder.setMvccReadPoint(scanner.getMvccReadPoint());
3177    builder.setTtl(scannerLeaseTimeoutPeriod);
3178    String scannerName = toScannerName(scannerId);
3179
3180    boolean fullRegionScan =
3181      !region.getRegionInfo().getTable().isSystemTable() && isFullRegionScan(scan, region);
3182
3183    return new Pair<String, RegionScannerHolder>(scannerName,
3184      addScanner(scannerName, scanner, shipper, region, scan.isNeedCursorResult(), fullRegionScan));
3185  }
3186
3187  /**
3188   * The returned String is used as key doing look up of outstanding Scanners in this Servers'
3189   * this.scanners, the Map of outstanding scanners and their current state.
3190   * @param scannerId A scanner long id.
3191   * @return The long id as a String.
3192   */
3193  private static String toScannerName(long scannerId) {
3194    return Long.toString(scannerId);
3195  }
3196
3197  private void checkScanNextCallSeq(ScanRequest request, RegionScannerHolder rsh)
3198    throws OutOfOrderScannerNextException {
3199    // if nextCallSeq does not match throw Exception straight away. This needs to be
3200    // performed even before checking of Lease.
3201    // See HBASE-5974
3202    if (request.hasNextCallSeq()) {
3203      long callSeq = request.getNextCallSeq();
3204      if (!rsh.incNextCallSeq(callSeq)) {
3205        throw new OutOfOrderScannerNextException(
3206          "Expected nextCallSeq: " + rsh.getNextCallSeq() + " But the nextCallSeq got from client: "
3207            + request.getNextCallSeq() + "; request=" + TextFormat.shortDebugString(request));
3208      }
3209    }
3210  }
3211
3212  private void addScannerLeaseBack(LeaseManager.Lease lease) {
3213    try {
3214      server.getLeaseManager().addLease(lease);
3215    } catch (LeaseStillHeldException e) {
3216      // should not happen as the scanner id is unique.
3217      throw new AssertionError(e);
3218    }
3219  }
3220
3221  // visible for testing only
3222  long getTimeLimit(RpcCall rpcCall, HBaseRpcController controller,
3223    boolean allowHeartbeatMessages) {
3224    // Set the time limit to be half of the more restrictive timeout value (one of the
3225    // timeout values must be positive). In the event that both values are positive, the
3226    // more restrictive of the two is used to calculate the limit.
3227    if (allowHeartbeatMessages) {
3228      long now = EnvironmentEdgeManager.currentTime();
3229      long remainingTimeout = getRemainingRpcTimeout(rpcCall, controller, now);
3230      if (scannerLeaseTimeoutPeriod > 0 || remainingTimeout > 0) {
3231        long timeLimitDelta;
3232        if (scannerLeaseTimeoutPeriod > 0 && remainingTimeout > 0) {
3233          timeLimitDelta = Math.min(scannerLeaseTimeoutPeriod, remainingTimeout);
3234        } else {
3235          timeLimitDelta =
3236            scannerLeaseTimeoutPeriod > 0 ? scannerLeaseTimeoutPeriod : remainingTimeout;
3237        }
3238
3239        // Use half of whichever timeout value was more restrictive... But don't allow
3240        // the time limit to be less than the allowable minimum (could cause an
3241        // immediate timeout before scanning any data).
3242        timeLimitDelta = Math.max(timeLimitDelta / 2, minimumScanTimeLimitDelta);
3243        return now + timeLimitDelta;
3244      }
3245    }
3246    // Default value of timeLimit is negative to indicate no timeLimit should be
3247    // enforced.
3248    return -1L;
3249  }
3250
3251  private long getRemainingRpcTimeout(RpcCall call, HBaseRpcController controller, long now) {
3252    long timeout;
3253    if (controller != null && controller.getCallTimeout() > 0) {
3254      timeout = controller.getCallTimeout();
3255    } else if (rpcTimeout > 0) {
3256      timeout = rpcTimeout;
3257    } else {
3258      return -1;
3259    }
3260    if (call != null) {
3261      timeout -= (now - call.getReceiveTime());
3262    }
3263    // getTimeLimit ignores values <= 0, but timeout may now be negative if queue time was high.
3264    // return minimum value here in that case so we count this in calculating the final delta.
3265    return Math.max(minimumScanTimeLimitDelta, timeout);
3266  }
3267
3268  private void checkLimitOfRows(int numOfCompleteRows, int limitOfRows, boolean moreRows,
3269    ScannerContext scannerContext, ScanResponse.Builder builder) {
3270    if (numOfCompleteRows >= limitOfRows) {
3271      if (LOG.isTraceEnabled()) {
3272        LOG.trace("Done scanning, limit of rows reached, moreRows: " + moreRows
3273          + " scannerContext: " + scannerContext);
3274      }
3275      builder.setMoreResults(false);
3276    }
3277  }
3278
3279  // return whether we have more results in region.
3280  private void scan(HBaseRpcController controller, ScanRequest request, RegionScannerHolder rsh,
3281    long maxQuotaResultSize, int maxResults, int limitOfRows, List<Result> results,
3282    ScanResponse.Builder builder, RpcCall rpcCall) throws IOException {
3283    HRegion region = rsh.r;
3284    RegionScanner scanner = rsh.s;
3285    long maxResultSize;
3286    if (scanner.getMaxResultSize() > 0) {
3287      maxResultSize = Math.min(scanner.getMaxResultSize(), maxQuotaResultSize);
3288    } else {
3289      maxResultSize = maxQuotaResultSize;
3290    }
3291    // This is cells inside a row. Default size is 10 so if many versions or many cfs,
3292    // then we'll resize. Resizings show in profiler. Set it higher than 10. For now
3293    // arbitrary 32. TODO: keep record of general size of results being returned.
3294    ArrayList<Cell> values = new ArrayList<>(32);
3295    region.startRegionOperation(Operation.SCAN);
3296    long before = EnvironmentEdgeManager.currentTime();
3297    // Used to check if we've matched the row limit set on the Scan
3298    int numOfCompleteRows = 0;
3299    // Count of times we call nextRaw; can be > numOfCompleteRows.
3300    int numOfNextRawCalls = 0;
3301    try {
3302      int numOfResults = 0;
3303      synchronized (scanner) {
3304        boolean stale = (region.getRegionInfo().getReplicaId() != 0);
3305        boolean clientHandlesPartials =
3306          request.hasClientHandlesPartials() && request.getClientHandlesPartials();
3307        boolean clientHandlesHeartbeats =
3308          request.hasClientHandlesHeartbeats() && request.getClientHandlesHeartbeats();
3309
3310        // On the server side we must ensure that the correct ordering of partial results is
3311        // returned to the client to allow them to properly reconstruct the partial results.
3312        // If the coprocessor host is adding to the result list, we cannot guarantee the
3313        // correct ordering of partial results and so we prevent partial results from being
3314        // formed.
3315        boolean serverGuaranteesOrderOfPartials = results.isEmpty();
3316        boolean allowPartialResults = clientHandlesPartials && serverGuaranteesOrderOfPartials;
3317        boolean moreRows = false;
3318
3319        // Heartbeat messages occur when the processing of the ScanRequest is exceeds a
3320        // certain time threshold on the server. When the time threshold is exceeded, the
3321        // server stops the scan and sends back whatever Results it has accumulated within
3322        // that time period (may be empty). Since heartbeat messages have the potential to
3323        // create partial Results (in the event that the timeout occurs in the middle of a
3324        // row), we must only generate heartbeat messages when the client can handle both
3325        // heartbeats AND partials
3326        boolean allowHeartbeatMessages = clientHandlesHeartbeats && allowPartialResults;
3327
3328        long timeLimit = getTimeLimit(rpcCall, controller, allowHeartbeatMessages);
3329
3330        final LimitScope sizeScope =
3331          allowPartialResults ? LimitScope.BETWEEN_CELLS : LimitScope.BETWEEN_ROWS;
3332        final LimitScope timeScope =
3333          allowHeartbeatMessages ? LimitScope.BETWEEN_CELLS : LimitScope.BETWEEN_ROWS;
3334
3335        boolean trackMetrics = request.hasTrackScanMetrics() && request.getTrackScanMetrics();
3336
3337        // Configure with limits for this RPC. Set keep progress true since size progress
3338        // towards size limit should be kept between calls to nextRaw
3339        ScannerContext.Builder contextBuilder = ScannerContext.newBuilder(true);
3340        // maxResultSize - either we can reach this much size for all cells(being read) data or sum
3341        // of heap size occupied by cells(being read). Cell data means its key and value parts.
3342        // maxQuotaResultSize - max results just from server side configuration and quotas, without
3343        // user's specified max. We use this for evaluating limits based on blocks (not cells).
3344        // We may have accumulated some results in coprocessor preScannerNext call. Subtract any
3345        // cell or block size from maximum here so we adhere to total limits of request.
3346        // Note: we track block size in StoreScanner. If the CP hook got cells from hbase, it will
3347        // have accumulated block bytes. If not, this will be 0 for block size.
3348        long maxCellSize = maxResultSize;
3349        long maxBlockSize = maxQuotaResultSize;
3350        if (rpcCall != null) {
3351          maxBlockSize -= rpcCall.getBlockBytesScanned();
3352          maxCellSize -= rpcCall.getResponseCellSize();
3353        }
3354
3355        contextBuilder.setSizeLimit(sizeScope, maxCellSize, maxCellSize, maxBlockSize);
3356        contextBuilder.setBatchLimit(scanner.getBatch());
3357        contextBuilder.setTimeLimit(timeScope, timeLimit);
3358        contextBuilder.setTrackMetrics(trackMetrics);
3359        ScannerContext scannerContext = contextBuilder.build();
3360        boolean limitReached = false;
3361        while (numOfResults < maxResults) {
3362          // Reset the batch progress to 0 before every call to RegionScanner#nextRaw. The
3363          // batch limit is a limit on the number of cells per Result. Thus, if progress is
3364          // being tracked (i.e. scannerContext.keepProgress() is true) then we need to
3365          // reset the batch progress between nextRaw invocations since we don't want the
3366          // batch progress from previous calls to affect future calls
3367          scannerContext.setBatchProgress(0);
3368          assert values.isEmpty();
3369
3370          // Collect values to be returned here
3371          moreRows = scanner.nextRaw(values, scannerContext);
3372          if (rpcCall == null) {
3373            // When there is no RpcCallContext,copy EC to heap, then the scanner would close,
3374            // This can be an EXPENSIVE call. It may make an extra copy from offheap to onheap
3375            // buffers.See more details in HBASE-26036.
3376            CellUtil.cloneIfNecessary(values);
3377          }
3378          numOfNextRawCalls++;
3379
3380          if (!values.isEmpty()) {
3381            if (limitOfRows > 0) {
3382              // First we need to check if the last result is partial and we have a row change. If
3383              // so then we need to increase the numOfCompleteRows.
3384              if (results.isEmpty()) {
3385                if (
3386                  rsh.rowOfLastPartialResult != null
3387                    && !CellUtil.matchingRows(values.get(0), rsh.rowOfLastPartialResult)
3388                ) {
3389                  numOfCompleteRows++;
3390                  checkLimitOfRows(numOfCompleteRows, limitOfRows, moreRows, scannerContext,
3391                    builder);
3392                }
3393              } else {
3394                Result lastResult = results.get(results.size() - 1);
3395                if (
3396                  lastResult.mayHaveMoreCellsInRow()
3397                    && !CellUtil.matchingRows(values.get(0), lastResult.getRow())
3398                ) {
3399                  numOfCompleteRows++;
3400                  checkLimitOfRows(numOfCompleteRows, limitOfRows, moreRows, scannerContext,
3401                    builder);
3402                }
3403              }
3404              if (builder.hasMoreResults() && !builder.getMoreResults()) {
3405                break;
3406              }
3407            }
3408            boolean mayHaveMoreCellsInRow = scannerContext.mayHaveMoreCellsInRow();
3409            Result r = Result.create(values, null, stale, mayHaveMoreCellsInRow);
3410            results.add(r);
3411            numOfResults++;
3412            if (!mayHaveMoreCellsInRow && limitOfRows > 0) {
3413              numOfCompleteRows++;
3414              checkLimitOfRows(numOfCompleteRows, limitOfRows, moreRows, scannerContext, builder);
3415              if (builder.hasMoreResults() && !builder.getMoreResults()) {
3416                break;
3417              }
3418            }
3419          } else if (!moreRows && !results.isEmpty()) {
3420            // No more cells for the scan here, we need to ensure that the mayHaveMoreCellsInRow of
3421            // last result is false. Otherwise it's possible that: the first nextRaw returned
3422            // because BATCH_LIMIT_REACHED (BTW it happen to exhaust all cells of the scan),so the
3423            // last result's mayHaveMoreCellsInRow will be true. while the following nextRaw will
3424            // return with moreRows=false, which means moreResultsInRegion would be false, it will
3425            // be a contradictory state (HBASE-21206).
3426            int lastIdx = results.size() - 1;
3427            Result r = results.get(lastIdx);
3428            if (r.mayHaveMoreCellsInRow()) {
3429              results.set(lastIdx,
3430                PackagePrivateFieldAccessor.createResult(
3431                  PackagePrivateFieldAccessor.getExtendedRawCells(r), r.getExists(), r.isStale(),
3432                  false));
3433            }
3434          }
3435          boolean sizeLimitReached = scannerContext.checkSizeLimit(LimitScope.BETWEEN_ROWS);
3436          boolean timeLimitReached = scannerContext.checkTimeLimit(LimitScope.BETWEEN_ROWS);
3437          boolean resultsLimitReached = numOfResults >= maxResults;
3438          limitReached = sizeLimitReached || timeLimitReached || resultsLimitReached;
3439
3440          if (limitReached || !moreRows) {
3441            // With block size limit, we may exceed size limit without collecting any results.
3442            // In this case we want to send heartbeat and/or cursor. We don't want to send heartbeat
3443            // or cursor if results were collected, for example for cell size or heap size limits.
3444            boolean sizeLimitReachedWithoutResults = sizeLimitReached && results.isEmpty();
3445            // We only want to mark a ScanResponse as a heartbeat message in the event that
3446            // there are more values to be read server side. If there aren't more values,
3447            // marking it as a heartbeat is wasteful because the client will need to issue
3448            // another ScanRequest only to realize that they already have all the values
3449            if (moreRows && (timeLimitReached || sizeLimitReachedWithoutResults)) {
3450              // Heartbeat messages occur when the time limit has been reached, or size limit has
3451              // been reached before collecting any results. This can happen for heavily filtered
3452              // scans which scan over too many blocks.
3453              builder.setHeartbeatMessage(true);
3454              if (rsh.needCursor) {
3455                Cell cursorCell = scannerContext.getLastPeekedCell();
3456                if (cursorCell != null) {
3457                  builder.setCursor(ProtobufUtil.toCursor(cursorCell));
3458                }
3459              }
3460            }
3461            break;
3462          }
3463          values.clear();
3464        }
3465        if (rpcCall != null) {
3466          rpcCall.incrementResponseCellSize(scannerContext.getHeapSizeProgress());
3467        }
3468        builder.setMoreResultsInRegion(moreRows);
3469        // Check to see if the client requested that we track metrics server side. If the
3470        // client requested metrics, retrieve the metrics from the scanner context.
3471        if (trackMetrics) {
3472          // rather than increment yet another counter in StoreScanner, just set the value here
3473          // from block size progress before writing into the response
3474          scannerContext.getMetrics().countOfBlockBytesScanned
3475            .set(scannerContext.getBlockSizeProgress());
3476          if (rpcCall != null) {
3477            scannerContext.getMetrics().fsReadTime.set(rpcCall.getFsReadTime());
3478          }
3479          Map<String, Long> metrics = scannerContext.getMetrics().getMetricsMap();
3480          ScanMetrics.Builder metricBuilder = ScanMetrics.newBuilder();
3481          NameInt64Pair.Builder pairBuilder = NameInt64Pair.newBuilder();
3482
3483          for (Entry<String, Long> entry : metrics.entrySet()) {
3484            pairBuilder.setName(entry.getKey());
3485            pairBuilder.setValue(entry.getValue());
3486            metricBuilder.addMetrics(pairBuilder.build());
3487          }
3488
3489          builder.setScanMetrics(metricBuilder.build());
3490        }
3491      }
3492    } finally {
3493      region.closeRegionOperation();
3494      // Update serverside metrics, even on error.
3495      long end = EnvironmentEdgeManager.currentTime();
3496
3497      long responseCellSize = 0;
3498      long blockBytesScanned = 0;
3499      if (rpcCall != null) {
3500        responseCellSize = rpcCall.getResponseCellSize();
3501        blockBytesScanned = rpcCall.getBlockBytesScanned();
3502        rsh.updateBlockBytesScanned(blockBytesScanned);
3503      }
3504      region.getMetrics().updateScan();
3505      final MetricsRegionServer metricsRegionServer = server.getMetrics();
3506      if (metricsRegionServer != null) {
3507        metricsRegionServer.updateScan(region, end - before, responseCellSize, blockBytesScanned);
3508        metricsRegionServer.updateReadQueryMeter(region, numOfNextRawCalls);
3509      }
3510    }
3511    // coprocessor postNext hook
3512    if (region.getCoprocessorHost() != null) {
3513      region.getCoprocessorHost().postScannerNext(scanner, results, maxResults, true);
3514    }
3515  }
3516
3517  /**
3518   * Scan data in a table.
3519   * @param controller the RPC controller
3520   * @param request    the scan request
3521   */
3522  @Override
3523  public ScanResponse scan(final RpcController controller, final ScanRequest request)
3524    throws ServiceException {
3525    if (controller != null && !(controller instanceof HBaseRpcController)) {
3526      throw new UnsupportedOperationException(
3527        "We only do " + "HBaseRpcControllers! FIX IF A PROBLEM: " + controller);
3528    }
3529    if (!request.hasScannerId() && !request.hasScan()) {
3530      throw new ServiceException(
3531        new DoNotRetryIOException("Missing required input: scannerId or scan"));
3532    }
3533    try {
3534      checkOpen();
3535    } catch (IOException e) {
3536      if (request.hasScannerId()) {
3537        String scannerName = toScannerName(request.getScannerId());
3538        if (LOG.isDebugEnabled()) {
3539          LOG.debug(
3540            "Server shutting down and client tried to access missing scanner " + scannerName);
3541        }
3542        final LeaseManager leaseManager = server.getLeaseManager();
3543        if (leaseManager != null) {
3544          try {
3545            leaseManager.cancelLease(scannerName);
3546          } catch (LeaseException le) {
3547            // No problem, ignore
3548            if (LOG.isTraceEnabled()) {
3549              LOG.trace("Un-able to cancel lease of scanner. It could already be closed.");
3550            }
3551          }
3552        }
3553      }
3554      throw new ServiceException(e);
3555    }
3556    requestCount.increment();
3557    rpcScanRequestCount.increment();
3558    RegionScannerHolder rsh;
3559    ScanResponse.Builder builder = ScanResponse.newBuilder();
3560    String scannerName;
3561    try {
3562      if (request.hasScannerId()) {
3563        // The downstream projects such as AsyncHBase in OpenTSDB need this value. See HBASE-18000
3564        // for more details.
3565        long scannerId = request.getScannerId();
3566        builder.setScannerId(scannerId);
3567        scannerName = toScannerName(scannerId);
3568        rsh = getRegionScanner(request);
3569      } else {
3570        Pair<String, RegionScannerHolder> scannerNameAndRSH = newRegionScanner(request, builder);
3571        scannerName = scannerNameAndRSH.getFirst();
3572        rsh = scannerNameAndRSH.getSecond();
3573      }
3574    } catch (IOException e) {
3575      if (e == SCANNER_ALREADY_CLOSED) {
3576        // Now we will close scanner automatically if there are no more results for this region but
3577        // the old client will still send a close request to us. Just ignore it and return.
3578        return builder.build();
3579      }
3580      throw new ServiceException(e);
3581    }
3582    if (rsh.fullRegionScan) {
3583      rpcFullScanRequestCount.increment();
3584    }
3585    HRegion region = rsh.r;
3586    LeaseManager.Lease lease;
3587    try {
3588      // Remove lease while its being processed in server; protects against case
3589      // where processing of request takes > lease expiration time. or null if none found.
3590      lease = server.getLeaseManager().removeLease(scannerName);
3591    } catch (LeaseException e) {
3592      throw new ServiceException(e);
3593    }
3594    if (request.hasRenew() && request.getRenew()) {
3595      // add back and return
3596      addScannerLeaseBack(lease);
3597      try {
3598        checkScanNextCallSeq(request, rsh);
3599      } catch (OutOfOrderScannerNextException e) {
3600        throw new ServiceException(e);
3601      }
3602      return builder.build();
3603    }
3604    OperationQuota quota;
3605    try {
3606      quota = getRpcQuotaManager().checkScanQuota(region, request, maxScannerResultSize,
3607        rsh.getMaxBlockBytesScanned(), rsh.getPrevBlockBytesScannedDifference());
3608    } catch (IOException e) {
3609      addScannerLeaseBack(lease);
3610      throw new ServiceException(e);
3611    }
3612    try {
3613      checkScanNextCallSeq(request, rsh);
3614    } catch (OutOfOrderScannerNextException e) {
3615      addScannerLeaseBack(lease);
3616      throw new ServiceException(e);
3617    }
3618    // Now we have increased the next call sequence. If we give client an error, the retry will
3619    // never success. So we'd better close the scanner and return a DoNotRetryIOException to client
3620    // and then client will try to open a new scanner.
3621    boolean closeScanner = request.hasCloseScanner() ? request.getCloseScanner() : false;
3622    int rows; // this is scan.getCaching
3623    if (request.hasNumberOfRows()) {
3624      rows = request.getNumberOfRows();
3625    } else {
3626      rows = closeScanner ? 0 : 1;
3627    }
3628    RpcCall rpcCall = RpcServer.getCurrentCall().orElse(null);
3629    // now let's do the real scan.
3630    long maxQuotaResultSize = Math.min(maxScannerResultSize, quota.getMaxResultSize());
3631    RegionScanner scanner = rsh.s;
3632    // this is the limit of rows for this scan, if we the number of rows reach this value, we will
3633    // close the scanner.
3634    int limitOfRows;
3635    if (request.hasLimitOfRows()) {
3636      limitOfRows = request.getLimitOfRows();
3637    } else {
3638      limitOfRows = -1;
3639    }
3640    boolean scannerClosed = false;
3641    try {
3642      List<Result> results = new ArrayList<>(Math.min(rows, 512));
3643      if (rows > 0) {
3644        boolean done = false;
3645        // Call coprocessor. Get region info from scanner.
3646        if (region.getCoprocessorHost() != null) {
3647          Boolean bypass = region.getCoprocessorHost().preScannerNext(scanner, results, rows);
3648          if (!results.isEmpty()) {
3649            for (Result r : results) {
3650              // add cell size from CP results so we can track response size and update limits
3651              // when calling scan below if !done. We'll also have tracked block size if the CP
3652              // got results from hbase, since StoreScanner tracks that for all calls automatically.
3653              addSize(rpcCall, r);
3654            }
3655          }
3656          if (bypass != null && bypass.booleanValue()) {
3657            done = true;
3658          }
3659        }
3660        if (!done) {
3661          scan((HBaseRpcController) controller, request, rsh, maxQuotaResultSize, rows, limitOfRows,
3662            results, builder, rpcCall);
3663        } else {
3664          builder.setMoreResultsInRegion(!results.isEmpty());
3665        }
3666      } else {
3667        // This is a open scanner call with numberOfRow = 0, so set more results in region to true.
3668        builder.setMoreResultsInRegion(true);
3669      }
3670
3671      quota.addScanResult(results);
3672      addResults(builder, results, (HBaseRpcController) controller,
3673        RegionReplicaUtil.isDefaultReplica(region.getRegionInfo()),
3674        isClientCellBlockSupport(rpcCall));
3675      if (scanner.isFilterDone() && results.isEmpty()) {
3676        // If the scanner's filter - if any - is done with the scan
3677        // only set moreResults to false if the results is empty. This is used to keep compatible
3678        // with the old scan implementation where we just ignore the returned results if moreResults
3679        // is false. Can remove the isEmpty check after we get rid of the old implementation.
3680        builder.setMoreResults(false);
3681      }
3682      // Later we may close the scanner depending on this flag so here we need to make sure that we
3683      // have already set this flag.
3684      assert builder.hasMoreResultsInRegion();
3685      // we only set moreResults to false in the above code, so set it to true if we haven't set it
3686      // yet.
3687      if (!builder.hasMoreResults()) {
3688        builder.setMoreResults(true);
3689      }
3690      if (builder.getMoreResults() && builder.getMoreResultsInRegion() && !results.isEmpty()) {
3691        // Record the last cell of the last result if it is a partial result
3692        // We need this to calculate the complete rows we have returned to client as the
3693        // mayHaveMoreCellsInRow is true does not mean that there will be extra cells for the
3694        // current row. We may filter out all the remaining cells for the current row and just
3695        // return the cells of the nextRow when calling RegionScanner.nextRaw. So here we need to
3696        // check for row change.
3697        Result lastResult = results.get(results.size() - 1);
3698        if (lastResult.mayHaveMoreCellsInRow()) {
3699          rsh.rowOfLastPartialResult = lastResult.getRow();
3700        } else {
3701          rsh.rowOfLastPartialResult = null;
3702        }
3703      }
3704      if (!builder.getMoreResults() || !builder.getMoreResultsInRegion() || closeScanner) {
3705        scannerClosed = true;
3706        closeScanner(region, scanner, scannerName, rpcCall, false);
3707      }
3708
3709      // There's no point returning to a timed out client. Throwing ensures scanner is closed
3710      if (rpcCall != null && EnvironmentEdgeManager.currentTime() > rpcCall.getDeadline()) {
3711        throw new TimeoutIOException("Client deadline exceeded, cannot return results");
3712      }
3713
3714      return builder.build();
3715    } catch (IOException e) {
3716      try {
3717        // scanner is closed here
3718        scannerClosed = true;
3719        // The scanner state might be left in a dirty state, so we will tell the Client to
3720        // fail this RPC and close the scanner while opening up another one from the start of
3721        // row that the client has last seen.
3722        closeScanner(region, scanner, scannerName, rpcCall, true);
3723
3724        // If it is a DoNotRetryIOException already, throw as it is. Unfortunately, DNRIOE is
3725        // used in two different semantics.
3726        // (1) The first is to close the client scanner and bubble up the exception all the way
3727        // to the application. This is preferred when the exception is really un-recoverable
3728        // (like CorruptHFileException, etc). Plain DoNotRetryIOException also falls into this
3729        // bucket usually.
3730        // (2) Second semantics is to close the current region scanner only, but continue the
3731        // client scanner by overriding the exception. This is usually UnknownScannerException,
3732        // OutOfOrderScannerNextException, etc where the region scanner has to be closed, but the
3733        // application-level ClientScanner has to continue without bubbling up the exception to
3734        // the client. See ClientScanner code to see how it deals with these special exceptions.
3735        if (e instanceof DoNotRetryIOException) {
3736          throw e;
3737        }
3738
3739        // If it is a FileNotFoundException, wrap as a
3740        // DoNotRetryIOException. This can avoid the retry in ClientScanner.
3741        if (e instanceof FileNotFoundException) {
3742          throw new DoNotRetryIOException(e);
3743        }
3744
3745        // We closed the scanner already. Instead of throwing the IOException, and client
3746        // retrying with the same scannerId only to get USE on the next RPC, we directly throw
3747        // a special exception to save an RPC.
3748        if (VersionInfoUtil.hasMinimumVersion(rpcCall.getClientVersionInfo(), 1, 4)) {
3749          // 1.4.0+ clients know how to handle
3750          throw new ScannerResetException("Scanner is closed on the server-side", e);
3751        } else {
3752          // older clients do not know about SRE. Just throw USE, which they will handle
3753          throw new UnknownScannerException("Throwing UnknownScannerException to reset the client"
3754            + " scanner state for clients older than 1.3.", e);
3755        }
3756      } catch (IOException ioe) {
3757        throw new ServiceException(ioe);
3758      }
3759    } finally {
3760      if (!scannerClosed) {
3761        // Adding resets expiration time on lease.
3762        // the closeCallBack will be set in closeScanner so here we only care about shippedCallback
3763        if (rpcCall != null) {
3764          rpcCall.setCallBack(rsh.shippedCallback);
3765        } else {
3766          // If context is null,here we call rsh.shippedCallback directly to reuse the logic in
3767          // rsh.shippedCallback to release the internal resources in rsh,and lease is also added
3768          // back to regionserver's LeaseManager in rsh.shippedCallback.
3769          runShippedCallback(rsh);
3770        }
3771      }
3772      quota.close();
3773    }
3774  }
3775
3776  private void runShippedCallback(RegionScannerHolder rsh) throws ServiceException {
3777    assert rsh.shippedCallback != null;
3778    try {
3779      rsh.shippedCallback.run();
3780    } catch (IOException ioe) {
3781      throw new ServiceException(ioe);
3782    }
3783  }
3784
3785  private void closeScanner(HRegion region, RegionScanner scanner, String scannerName,
3786    RpcCallContext context, boolean isError) throws IOException {
3787    if (region.getCoprocessorHost() != null) {
3788      if (region.getCoprocessorHost().preScannerClose(scanner)) {
3789        // bypass the actual close.
3790        return;
3791      }
3792    }
3793    RegionScannerHolder rsh = scanners.remove(scannerName);
3794    if (rsh != null) {
3795      if (context != null) {
3796        context.setCallBack(rsh.closeCallBack);
3797      } else {
3798        rsh.s.close();
3799      }
3800      if (region.getCoprocessorHost() != null) {
3801        region.getCoprocessorHost().postScannerClose(scanner);
3802      }
3803      if (!isError) {
3804        closedScanners.put(scannerName, rsh.getNextCallSeq());
3805      }
3806    }
3807  }
3808
3809  @Override
3810  public CoprocessorServiceResponse execRegionServerService(RpcController controller,
3811    CoprocessorServiceRequest request) throws ServiceException {
3812    rpcPreCheck("execRegionServerService");
3813    return server.execRegionServerService(controller, request);
3814  }
3815
3816  @Override
3817  public GetSpaceQuotaSnapshotsResponse getSpaceQuotaSnapshots(RpcController controller,
3818    GetSpaceQuotaSnapshotsRequest request) throws ServiceException {
3819    try {
3820      final RegionServerSpaceQuotaManager manager = server.getRegionServerSpaceQuotaManager();
3821      final GetSpaceQuotaSnapshotsResponse.Builder builder =
3822        GetSpaceQuotaSnapshotsResponse.newBuilder();
3823      if (manager != null) {
3824        final Map<TableName, SpaceQuotaSnapshot> snapshots = manager.copyQuotaSnapshots();
3825        for (Entry<TableName, SpaceQuotaSnapshot> snapshot : snapshots.entrySet()) {
3826          builder.addSnapshots(TableQuotaSnapshot.newBuilder()
3827            .setTableName(ProtobufUtil.toProtoTableName(snapshot.getKey()))
3828            .setSnapshot(SpaceQuotaSnapshot.toProtoSnapshot(snapshot.getValue())).build());
3829        }
3830      }
3831      return builder.build();
3832    } catch (Exception e) {
3833      throw new ServiceException(e);
3834    }
3835  }
3836
3837  @Override
3838  public ClearRegionBlockCacheResponse clearRegionBlockCache(RpcController controller,
3839    ClearRegionBlockCacheRequest request) throws ServiceException {
3840    try {
3841      rpcPreCheck("clearRegionBlockCache");
3842      ClearRegionBlockCacheResponse.Builder builder = ClearRegionBlockCacheResponse.newBuilder();
3843      CacheEvictionStatsBuilder stats = CacheEvictionStats.builder();
3844      server.getRegionServerCoprocessorHost().preClearRegionBlockCache();
3845      List<HRegion> regions = getRegions(request.getRegionList(), stats);
3846      for (HRegion region : regions) {
3847        try {
3848          stats = stats.append(this.server.clearRegionBlockCache(region));
3849        } catch (Exception e) {
3850          stats.addException(region.getRegionInfo().getRegionName(), e);
3851        }
3852      }
3853      stats.withMaxCacheSize(server.getBlockCache().map(BlockCache::getMaxSize).orElse(0L));
3854      server.getRegionServerCoprocessorHost().postClearRegionBlockCache(stats.build());
3855      return builder.setStats(ProtobufUtil.toCacheEvictionStats(stats.build())).build();
3856    } catch (IOException e) {
3857      throw new ServiceException(e);
3858    }
3859  }
3860
3861  private void executeOpenRegionProcedures(OpenRegionRequest request,
3862    Map<TableName, TableDescriptor> tdCache) {
3863    long masterSystemTime = request.hasMasterSystemTime() ? request.getMasterSystemTime() : -1;
3864    for (RegionOpenInfo regionOpenInfo : request.getOpenInfoList()) {
3865      RegionInfo regionInfo = ProtobufUtil.toRegionInfo(regionOpenInfo.getRegion());
3866      TableName tableName = regionInfo.getTable();
3867      TableDescriptor tableDesc = tdCache.get(tableName);
3868      if (tableDesc == null) {
3869        try {
3870          tableDesc = server.getTableDescriptors().get(regionInfo.getTable());
3871        } catch (IOException e) {
3872          // Here we do not fail the whole method since we also need deal with other
3873          // procedures, and we can not ignore this one, so we still schedule a
3874          // AssignRegionHandler and it will report back to master if we still can not get the
3875          // TableDescriptor.
3876          LOG.warn("Failed to get TableDescriptor of {}, will try again in the handler",
3877            regionInfo.getTable(), e);
3878        }
3879        if (tableDesc != null) {
3880          tdCache.put(tableName, tableDesc);
3881        }
3882      }
3883      if (regionOpenInfo.getFavoredNodesCount() > 0) {
3884        server.updateRegionFavoredNodesMapping(regionInfo.getEncodedName(),
3885          regionOpenInfo.getFavoredNodesList());
3886      }
3887      long procId = regionOpenInfo.getOpenProcId();
3888      if (server.submitRegionProcedure(procId)) {
3889        server.getExecutorService().submit(
3890          AssignRegionHandler.create(server, regionInfo, procId, tableDesc, masterSystemTime));
3891      }
3892    }
3893  }
3894
3895  private void executeCloseRegionProcedures(CloseRegionRequest request) {
3896    String encodedName;
3897    try {
3898      encodedName = ProtobufUtil.getRegionEncodedName(request.getRegion());
3899    } catch (DoNotRetryIOException e) {
3900      throw new UncheckedIOException("Should not happen", e);
3901    }
3902    ServerName destination = request.hasDestinationServer()
3903      ? ProtobufUtil.toServerName(request.getDestinationServer())
3904      : null;
3905    long procId = request.getCloseProcId();
3906    boolean evictCache = request.getEvictCache();
3907    if (server.submitRegionProcedure(procId)) {
3908      server.getExecutorService().submit(
3909        UnassignRegionHandler.create(server, encodedName, procId, false, destination, evictCache));
3910    }
3911  }
3912
3913  private void executeProcedures(RemoteProcedureRequest request) {
3914    RSProcedureCallable callable;
3915    try {
3916      callable = Class.forName(request.getProcClass()).asSubclass(RSProcedureCallable.class)
3917        .getDeclaredConstructor().newInstance();
3918    } catch (Exception e) {
3919      LOG.warn("Failed to instantiating remote procedure {}, pid={}", request.getProcClass(),
3920        request.getProcId(), e);
3921      server.remoteProcedureComplete(request.getProcId(), e);
3922      return;
3923    }
3924    callable.init(request.getProcData().toByteArray(), server);
3925    LOG.debug("Executing remote procedure {}, pid={}", callable.getClass(), request.getProcId());
3926    server.executeProcedure(request.getProcId(), callable);
3927  }
3928
3929  @Override
3930  @QosPriority(priority = HConstants.ADMIN_QOS)
3931  public ExecuteProceduresResponse executeProcedures(RpcController controller,
3932    ExecuteProceduresRequest request) throws ServiceException {
3933    try {
3934      checkOpen();
3935      throwOnWrongStartCode(request);
3936      server.getRegionServerCoprocessorHost().preExecuteProcedures();
3937      if (request.getOpenRegionCount() > 0) {
3938        // Avoid reading from the TableDescritor every time(usually it will read from the file
3939        // system)
3940        Map<TableName, TableDescriptor> tdCache = new HashMap<>();
3941        request.getOpenRegionList().forEach(req -> executeOpenRegionProcedures(req, tdCache));
3942      }
3943      if (request.getCloseRegionCount() > 0) {
3944        request.getCloseRegionList().forEach(this::executeCloseRegionProcedures);
3945      }
3946      if (request.getProcCount() > 0) {
3947        request.getProcList().forEach(this::executeProcedures);
3948      }
3949      server.getRegionServerCoprocessorHost().postExecuteProcedures();
3950      return ExecuteProceduresResponse.getDefaultInstance();
3951    } catch (IOException e) {
3952      throw new ServiceException(e);
3953    }
3954  }
3955
3956  @Override
3957  public GetAllBootstrapNodesResponse getAllBootstrapNodes(RpcController controller,
3958    GetAllBootstrapNodesRequest request) throws ServiceException {
3959    GetAllBootstrapNodesResponse.Builder builder = GetAllBootstrapNodesResponse.newBuilder();
3960    server.getBootstrapNodes()
3961      .forEachRemaining(server -> builder.addNode(ProtobufUtil.toServerName(server)));
3962    return builder.build();
3963  }
3964
3965  private void setReloadableGuardrails(Configuration conf) {
3966    rowSizeWarnThreshold =
3967      conf.getInt(HConstants.BATCH_ROWS_THRESHOLD_NAME, HConstants.BATCH_ROWS_THRESHOLD_DEFAULT);
3968    rejectRowsWithSizeOverThreshold =
3969      conf.getBoolean(REJECT_BATCH_ROWS_OVER_THRESHOLD, DEFAULT_REJECT_BATCH_ROWS_OVER_THRESHOLD);
3970    maxScannerResultSize = conf.getLong(HConstants.HBASE_SERVER_SCANNER_MAX_RESULT_SIZE_KEY,
3971      HConstants.DEFAULT_HBASE_SERVER_SCANNER_MAX_RESULT_SIZE);
3972  }
3973
3974  @Override
3975  public void onConfigurationChange(Configuration conf) {
3976    super.onConfigurationChange(conf);
3977    setReloadableGuardrails(conf);
3978  }
3979
3980  @Override
3981  public GetCachedFilesListResponse getCachedFilesList(RpcController controller,
3982    GetCachedFilesListRequest request) throws ServiceException {
3983    GetCachedFilesListResponse.Builder responseBuilder = GetCachedFilesListResponse.newBuilder();
3984    List<String> fullyCachedFiles = new ArrayList<>();
3985    server.getBlockCache().flatMap(BlockCache::getFullyCachedFiles).ifPresent(fcf -> {
3986      fullyCachedFiles.addAll(fcf.keySet());
3987    });
3988    return responseBuilder.addAllCachedFiles(fullyCachedFiles).build();
3989  }
3990}