001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import com.google.errorprone.annotations.RestrictedApi;
021import java.io.FileNotFoundException;
022import java.io.IOException;
023import java.io.UncheckedIOException;
024import java.net.BindException;
025import java.net.InetAddress;
026import java.net.InetSocketAddress;
027import java.util.ArrayList;
028import java.util.Arrays;
029import java.util.Collections;
030import java.util.HashMap;
031import java.util.Iterator;
032import java.util.List;
033import java.util.Map;
034import java.util.Map.Entry;
035import java.util.NavigableMap;
036import java.util.Set;
037import java.util.TreeSet;
038import java.util.concurrent.ConcurrentHashMap;
039import java.util.concurrent.ConcurrentMap;
040import java.util.concurrent.TimeUnit;
041import java.util.concurrent.atomic.AtomicBoolean;
042import java.util.concurrent.atomic.AtomicLong;
043import java.util.concurrent.atomic.LongAdder;
044import org.apache.hadoop.conf.Configuration;
045import org.apache.hadoop.fs.FileSystem;
046import org.apache.hadoop.fs.Path;
047import org.apache.hadoop.hbase.CacheEvictionStats;
048import org.apache.hadoop.hbase.CacheEvictionStatsBuilder;
049import org.apache.hadoop.hbase.Cell;
050import org.apache.hadoop.hbase.CellScannable;
051import org.apache.hadoop.hbase.CellScanner;
052import org.apache.hadoop.hbase.CellUtil;
053import org.apache.hadoop.hbase.DoNotRetryIOException;
054import org.apache.hadoop.hbase.DroppedSnapshotException;
055import org.apache.hadoop.hbase.HBaseIOException;
056import org.apache.hadoop.hbase.HBaseRpcServicesBase;
057import org.apache.hadoop.hbase.HConstants;
058import org.apache.hadoop.hbase.MultiActionResultTooLarge;
059import org.apache.hadoop.hbase.NotServingRegionException;
060import org.apache.hadoop.hbase.PrivateCellUtil;
061import org.apache.hadoop.hbase.RegionTooBusyException;
062import org.apache.hadoop.hbase.Server;
063import org.apache.hadoop.hbase.ServerName;
064import org.apache.hadoop.hbase.TableName;
065import org.apache.hadoop.hbase.UnknownScannerException;
066import org.apache.hadoop.hbase.client.Append;
067import org.apache.hadoop.hbase.client.CheckAndMutate;
068import org.apache.hadoop.hbase.client.CheckAndMutateResult;
069import org.apache.hadoop.hbase.client.Delete;
070import org.apache.hadoop.hbase.client.Durability;
071import org.apache.hadoop.hbase.client.Get;
072import org.apache.hadoop.hbase.client.Increment;
073import org.apache.hadoop.hbase.client.Mutation;
074import org.apache.hadoop.hbase.client.OperationWithAttributes;
075import org.apache.hadoop.hbase.client.Put;
076import org.apache.hadoop.hbase.client.RegionInfo;
077import org.apache.hadoop.hbase.client.RegionReplicaUtil;
078import org.apache.hadoop.hbase.client.Result;
079import org.apache.hadoop.hbase.client.Row;
080import org.apache.hadoop.hbase.client.Scan;
081import org.apache.hadoop.hbase.client.TableDescriptor;
082import org.apache.hadoop.hbase.client.VersionInfoUtil;
083import org.apache.hadoop.hbase.exceptions.FailedSanityCheckException;
084import org.apache.hadoop.hbase.exceptions.OutOfOrderScannerNextException;
085import org.apache.hadoop.hbase.exceptions.ScannerResetException;
086import org.apache.hadoop.hbase.exceptions.TimeoutIOException;
087import org.apache.hadoop.hbase.exceptions.UnknownProtocolException;
088import org.apache.hadoop.hbase.io.ByteBuffAllocator;
089import org.apache.hadoop.hbase.io.hfile.BlockCache;
090import org.apache.hadoop.hbase.ipc.HBaseRpcController;
091import org.apache.hadoop.hbase.ipc.PriorityFunction;
092import org.apache.hadoop.hbase.ipc.QosPriority;
093import org.apache.hadoop.hbase.ipc.RpcCall;
094import org.apache.hadoop.hbase.ipc.RpcCallContext;
095import org.apache.hadoop.hbase.ipc.RpcCallback;
096import org.apache.hadoop.hbase.ipc.RpcServer;
097import org.apache.hadoop.hbase.ipc.RpcServer.BlockingServiceAndInterface;
098import org.apache.hadoop.hbase.ipc.RpcServerFactory;
099import org.apache.hadoop.hbase.ipc.RpcServerInterface;
100import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
101import org.apache.hadoop.hbase.ipc.ServerRpcController;
102import org.apache.hadoop.hbase.net.Address;
103import org.apache.hadoop.hbase.procedure2.RSProcedureCallable;
104import org.apache.hadoop.hbase.quotas.ActivePolicyEnforcement;
105import org.apache.hadoop.hbase.quotas.OperationQuota;
106import org.apache.hadoop.hbase.quotas.QuotaUtil;
107import org.apache.hadoop.hbase.quotas.RegionServerRpcQuotaManager;
108import org.apache.hadoop.hbase.quotas.RegionServerSpaceQuotaManager;
109import org.apache.hadoop.hbase.quotas.SpaceQuotaSnapshot;
110import org.apache.hadoop.hbase.quotas.SpaceViolationPolicyEnforcement;
111import org.apache.hadoop.hbase.regionserver.LeaseManager.Lease;
112import org.apache.hadoop.hbase.regionserver.LeaseManager.LeaseStillHeldException;
113import org.apache.hadoop.hbase.regionserver.Region.Operation;
114import org.apache.hadoop.hbase.regionserver.ScannerContext.LimitScope;
115import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
116import org.apache.hadoop.hbase.regionserver.handler.AssignRegionHandler;
117import org.apache.hadoop.hbase.regionserver.handler.OpenMetaHandler;
118import org.apache.hadoop.hbase.regionserver.handler.OpenPriorityRegionHandler;
119import org.apache.hadoop.hbase.regionserver.handler.OpenRegionHandler;
120import org.apache.hadoop.hbase.regionserver.handler.UnassignRegionHandler;
121import org.apache.hadoop.hbase.replication.ReplicationUtils;
122import org.apache.hadoop.hbase.replication.regionserver.RejectReplicationRequestStateChecker;
123import org.apache.hadoop.hbase.replication.regionserver.RejectRequestsFromClientStateChecker;
124import org.apache.hadoop.hbase.security.Superusers;
125import org.apache.hadoop.hbase.security.access.Permission;
126import org.apache.hadoop.hbase.util.Bytes;
127import org.apache.hadoop.hbase.util.DNS;
128import org.apache.hadoop.hbase.util.DNS.ServerType;
129import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
130import org.apache.hadoop.hbase.util.Pair;
131import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil;
132import org.apache.hadoop.hbase.wal.WAL;
133import org.apache.hadoop.hbase.wal.WALEdit;
134import org.apache.hadoop.hbase.wal.WALKey;
135import org.apache.hadoop.hbase.wal.WALSplitUtil;
136import org.apache.hadoop.hbase.wal.WALSplitUtil.MutationReplay;
137import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
138import org.apache.yetus.audience.InterfaceAudience;
139import org.slf4j.Logger;
140import org.slf4j.LoggerFactory;
141
142import org.apache.hbase.thirdparty.com.google.common.cache.Cache;
143import org.apache.hbase.thirdparty.com.google.common.cache.CacheBuilder;
144import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList;
145import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
146import org.apache.hbase.thirdparty.com.google.protobuf.ByteString;
147import org.apache.hbase.thirdparty.com.google.protobuf.Message;
148import org.apache.hbase.thirdparty.com.google.protobuf.RpcController;
149import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
150import org.apache.hbase.thirdparty.com.google.protobuf.TextFormat;
151import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations;
152import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils;
153
154import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
155import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
156import org.apache.hadoop.hbase.shaded.protobuf.ResponseConverter;
157import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService;
158import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearCompactionQueuesRequest;
159import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearCompactionQueuesResponse;
160import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearRegionBlockCacheRequest;
161import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearRegionBlockCacheResponse;
162import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CloseRegionRequest;
163import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CloseRegionResponse;
164import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactRegionRequest;
165import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactRegionResponse;
166import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactionSwitchRequest;
167import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactionSwitchResponse;
168import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ExecuteProceduresRequest;
169import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ExecuteProceduresResponse;
170import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionRequest;
171import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionResponse;
172import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetCachedFilesListRequest;
173import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetCachedFilesListResponse;
174import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetOnlineRegionRequest;
175import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetOnlineRegionResponse;
176import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoRequest;
177import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoResponse;
178import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionLoadRequest;
179import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionLoadResponse;
180import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetServerInfoRequest;
181import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetServerInfoResponse;
182import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetStoreFileRequest;
183import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetStoreFileResponse;
184import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionRequest;
185import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionRequest.RegionOpenInfo;
186import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionResponse;
187import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionResponse.RegionOpeningState;
188import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.RemoteProcedureRequest;
189import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ReplicateWALEntryRequest;
190import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ReplicateWALEntryResponse;
191import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.RollWALWriterRequest;
192import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.RollWALWriterResponse;
193import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.StopServerRequest;
194import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.StopServerResponse;
195import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateFavoredNodesRequest;
196import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateFavoredNodesResponse;
197import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WALEntry;
198import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WarmupRegionRequest;
199import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WarmupRegionResponse;
200import org.apache.hadoop.hbase.shaded.protobuf.generated.BootstrapNodeProtos.BootstrapNodeService;
201import org.apache.hadoop.hbase.shaded.protobuf.generated.BootstrapNodeProtos.GetAllBootstrapNodesRequest;
202import org.apache.hadoop.hbase.shaded.protobuf.generated.BootstrapNodeProtos.GetAllBootstrapNodesResponse;
203import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
204import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.Action;
205import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.BulkLoadHFileRequest;
206import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.BulkLoadHFileRequest.FamilyPath;
207import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.BulkLoadHFileResponse;
208import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CleanupBulkLoadRequest;
209import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CleanupBulkLoadResponse;
210import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService;
211import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.Condition;
212import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceRequest;
213import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceResponse;
214import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.GetRequest;
215import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.GetResponse;
216import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MultiRegionLoadStats;
217import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MultiRequest;
218import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MultiResponse;
219import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateRequest;
220import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateResponse;
221import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutationProto;
222import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutationProto.MutationType;
223import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.PrepareBulkLoadRequest;
224import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.PrepareBulkLoadResponse;
225import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.RegionAction;
226import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.RegionActionResult;
227import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ResultOrException;
228import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanRequest;
229import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanResponse;
230import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos;
231import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos.RegionLoad;
232import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.NameBytesPair;
233import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.NameInt64Pair;
234import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier;
235import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType;
236import org.apache.hadoop.hbase.shaded.protobuf.generated.MapReduceProtos.ScanMetrics;
237import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsRequest;
238import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsResponse;
239import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsResponse.TableQuotaSnapshot;
240import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.ClientMetaService;
241import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.BulkLoadDescriptor;
242import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.CompactionDescriptor;
243import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.FlushDescriptor;
244import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.RegionEventDescriptor;
245
246/**
247 * Implements the regionserver RPC services.
248 */
249@InterfaceAudience.Private
250public class RSRpcServices extends HBaseRpcServicesBase<HRegionServer>
251  implements ClientService.BlockingInterface, BootstrapNodeService.BlockingInterface {
252
253  private static final Logger LOG = LoggerFactory.getLogger(RSRpcServices.class);
254
255  /** RPC scheduler to use for the region server. */
256  public static final String REGION_SERVER_RPC_SCHEDULER_FACTORY_CLASS =
257    "hbase.region.server.rpc.scheduler.factory.class";
258
259  /**
260   * Minimum allowable time limit delta (in milliseconds) that can be enforced during scans. This
261   * configuration exists to prevent the scenario where a time limit is specified to be so
262   * restrictive that the time limit is reached immediately (before any cells are scanned).
263   */
264  private static final String REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA =
265    "hbase.region.server.rpc.minimum.scan.time.limit.delta";
266  /**
267   * Default value of {@link RSRpcServices#REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA}
268   */
269  static final long DEFAULT_REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA = 10;
270
271  /**
272   * Whether to reject rows with size > threshold defined by
273   * {@link HConstants#BATCH_ROWS_THRESHOLD_NAME}
274   */
275  private static final String REJECT_BATCH_ROWS_OVER_THRESHOLD =
276    "hbase.rpc.rows.size.threshold.reject";
277
278  /**
279   * Default value of config {@link RSRpcServices#REJECT_BATCH_ROWS_OVER_THRESHOLD}
280   */
281  private static final boolean DEFAULT_REJECT_BATCH_ROWS_OVER_THRESHOLD = false;
282
283  // Request counter. (Includes requests that are not serviced by regions.)
284  // Count only once for requests with multiple actions like multi/caching-scan/replayBatch
285  final LongAdder requestCount = new LongAdder();
286
287  // Request counter for rpc get
288  final LongAdder rpcGetRequestCount = new LongAdder();
289
290  // Request counter for rpc scan
291  final LongAdder rpcScanRequestCount = new LongAdder();
292
293  // Request counter for scans that might end up in full scans
294  final LongAdder rpcFullScanRequestCount = new LongAdder();
295
296  // Request counter for rpc multi
297  final LongAdder rpcMultiRequestCount = new LongAdder();
298
299  // Request counter for rpc mutate
300  final LongAdder rpcMutateRequestCount = new LongAdder();
301
302  private volatile long maxScannerResultSize;
303
304  private ScannerIdGenerator scannerIdGenerator;
305  private final ConcurrentMap<String, RegionScannerHolder> scanners = new ConcurrentHashMap<>();
306  // Hold the name and last sequence number of a closed scanner for a while. This is used
307  // to keep compatible for old clients which may send next or close request to a region
308  // scanner which has already been exhausted. The entries will be removed automatically
309  // after scannerLeaseTimeoutPeriod.
310  private final Cache<String, Long> closedScanners;
311  /**
312   * The lease timeout period for client scanners (milliseconds).
313   */
314  private final int scannerLeaseTimeoutPeriod;
315
316  /**
317   * The RPC timeout period (milliseconds)
318   */
319  private final int rpcTimeout;
320
321  /**
322   * The minimum allowable delta to use for the scan limit
323   */
324  private final long minimumScanTimeLimitDelta;
325
326  /**
327   * Row size threshold for multi requests above which a warning is logged
328   */
329  private volatile int rowSizeWarnThreshold;
330  /*
331   * Whether we should reject requests with very high no of rows i.e. beyond threshold defined by
332   * rowSizeWarnThreshold
333   */
334  private volatile boolean rejectRowsWithSizeOverThreshold;
335
336  final AtomicBoolean clearCompactionQueues = new AtomicBoolean(false);
337
338  /**
339   * Services launched in RSRpcServices. By default they are on but you can use the below booleans
340   * to selectively enable/disable these services (Rare is the case where you would ever turn off
341   * one or the other).
342   */
343  public static final String REGIONSERVER_ADMIN_SERVICE_CONFIG =
344    "hbase.regionserver.admin.executorService";
345  public static final String REGIONSERVER_CLIENT_SERVICE_CONFIG =
346    "hbase.regionserver.client.executorService";
347  public static final String REGIONSERVER_CLIENT_META_SERVICE_CONFIG =
348    "hbase.regionserver.client.meta.executorService";
349  public static final String REGIONSERVER_BOOTSTRAP_NODES_SERVICE_CONFIG =
350    "hbase.regionserver.bootstrap.nodes.executorService";
351
352  /**
353   * An Rpc callback for closing a RegionScanner.
354   */
355  private static final class RegionScannerCloseCallBack implements RpcCallback {
356
357    private final RegionScanner scanner;
358
359    public RegionScannerCloseCallBack(RegionScanner scanner) {
360      this.scanner = scanner;
361    }
362
363    @Override
364    public void run() throws IOException {
365      this.scanner.close();
366    }
367  }
368
369  /**
370   * An Rpc callback for doing shipped() call on a RegionScanner.
371   */
372  private class RegionScannerShippedCallBack implements RpcCallback {
373    private final String scannerName;
374    private final Shipper shipper;
375    private final Lease lease;
376
377    public RegionScannerShippedCallBack(String scannerName, Shipper shipper, Lease lease) {
378      this.scannerName = scannerName;
379      this.shipper = shipper;
380      this.lease = lease;
381    }
382
383    @Override
384    public void run() throws IOException {
385      this.shipper.shipped();
386      // We're done. On way out re-add the above removed lease. The lease was temp removed for this
387      // Rpc call and we are at end of the call now. Time to add it back.
388      if (scanners.containsKey(scannerName)) {
389        if (lease != null) {
390          server.getLeaseManager().addLease(lease);
391        }
392      }
393    }
394  }
395
396  /**
397   * An RpcCallBack that creates a list of scanners that needs to perform callBack operation on
398   * completion of multiGets.
399   */
400  static class RegionScannersCloseCallBack implements RpcCallback {
401    private final List<RegionScanner> scanners = new ArrayList<>();
402
403    public void addScanner(RegionScanner scanner) {
404      this.scanners.add(scanner);
405    }
406
407    @Override
408    public void run() {
409      for (RegionScanner scanner : scanners) {
410        try {
411          scanner.close();
412        } catch (IOException e) {
413          LOG.error("Exception while closing the scanner " + scanner, e);
414        }
415      }
416    }
417  }
418
419  /**
420   * Holder class which holds the RegionScanner, nextCallSeq and RpcCallbacks together.
421   */
422  static final class RegionScannerHolder {
423    private final AtomicLong nextCallSeq = new AtomicLong(0);
424    private final RegionScanner s;
425    private final HRegion r;
426    private final RpcCallback closeCallBack;
427    private final RpcCallback shippedCallback;
428    private byte[] rowOfLastPartialResult;
429    private boolean needCursor;
430    private boolean fullRegionScan;
431    private final String clientIPAndPort;
432    private final String userName;
433    private volatile long maxBlockBytesScanned = 0;
434    private volatile long prevBlockBytesScanned = 0;
435    private volatile long prevBlockBytesScannedDifference = 0;
436
437    RegionScannerHolder(RegionScanner s, HRegion r, RpcCallback closeCallBack,
438      RpcCallback shippedCallback, boolean needCursor, boolean fullRegionScan,
439      String clientIPAndPort, String userName) {
440      this.s = s;
441      this.r = r;
442      this.closeCallBack = closeCallBack;
443      this.shippedCallback = shippedCallback;
444      this.needCursor = needCursor;
445      this.fullRegionScan = fullRegionScan;
446      this.clientIPAndPort = clientIPAndPort;
447      this.userName = userName;
448    }
449
450    long getNextCallSeq() {
451      return nextCallSeq.get();
452    }
453
454    boolean incNextCallSeq(long currentSeq) {
455      // Use CAS to prevent multiple scan request running on the same scanner.
456      return nextCallSeq.compareAndSet(currentSeq, currentSeq + 1);
457    }
458
459    long getMaxBlockBytesScanned() {
460      return maxBlockBytesScanned;
461    }
462
463    long getPrevBlockBytesScannedDifference() {
464      return prevBlockBytesScannedDifference;
465    }
466
467    void updateBlockBytesScanned(long blockBytesScanned) {
468      prevBlockBytesScannedDifference = blockBytesScanned - prevBlockBytesScanned;
469      prevBlockBytesScanned = blockBytesScanned;
470      if (blockBytesScanned > maxBlockBytesScanned) {
471        maxBlockBytesScanned = blockBytesScanned;
472      }
473    }
474
475    // Should be called only when we need to print lease expired messages otherwise
476    // cache the String once made.
477    @Override
478    public String toString() {
479      return "clientIPAndPort=" + this.clientIPAndPort + ", userName=" + this.userName
480        + ", regionInfo=" + this.r.getRegionInfo().getRegionNameAsString();
481    }
482  }
483
484  /**
485   * Instantiated as a scanner lease. If the lease times out, the scanner is closed
486   */
487  private class ScannerListener implements LeaseListener {
488    private final String scannerName;
489
490    ScannerListener(final String n) {
491      this.scannerName = n;
492    }
493
494    @Override
495    public void leaseExpired() {
496      RegionScannerHolder rsh = scanners.remove(this.scannerName);
497      if (rsh == null) {
498        LOG.warn("Scanner lease {} expired but no outstanding scanner", this.scannerName);
499        return;
500      }
501      LOG.info("Scanner lease {} expired {}", this.scannerName, rsh);
502      server.getMetrics().incrScannerLeaseExpired();
503      RegionScanner s = rsh.s;
504      HRegion region = null;
505      try {
506        region = server.getRegion(s.getRegionInfo().getRegionName());
507        if (region != null && region.getCoprocessorHost() != null) {
508          region.getCoprocessorHost().preScannerClose(s);
509        }
510      } catch (IOException e) {
511        LOG.error("Closing scanner {} {}", this.scannerName, rsh, e);
512      } finally {
513        try {
514          s.close();
515          if (region != null && region.getCoprocessorHost() != null) {
516            region.getCoprocessorHost().postScannerClose(s);
517          }
518        } catch (IOException e) {
519          LOG.error("Closing scanner {} {}", this.scannerName, rsh, e);
520        }
521      }
522    }
523  }
524
525  private static ResultOrException getResultOrException(final ClientProtos.Result r,
526    final int index) {
527    return getResultOrException(ResponseConverter.buildActionResult(r), index);
528  }
529
530  private static ResultOrException getResultOrException(final Exception e, final int index) {
531    return getResultOrException(ResponseConverter.buildActionResult(e), index);
532  }
533
534  private static ResultOrException getResultOrException(final ResultOrException.Builder builder,
535    final int index) {
536    return builder.setIndex(index).build();
537  }
538
539  /**
540   * Checks for the following pre-checks in order:
541   * <ol>
542   * <li>RegionServer is running</li>
543   * <li>If authorization is enabled, then RPC caller has ADMIN permissions</li>
544   * </ol>
545   * @param requestName name of rpc request. Used in reporting failures to provide context.
546   * @throws ServiceException If any of the above listed pre-check fails.
547   */
548  private void rpcPreCheck(String requestName) throws ServiceException {
549    try {
550      checkOpen();
551      requirePermission(requestName, Permission.Action.ADMIN);
552    } catch (IOException ioe) {
553      throw new ServiceException(ioe);
554    }
555  }
556
557  private boolean isClientCellBlockSupport(RpcCallContext context) {
558    return context != null && context.isClientCellBlockSupported();
559  }
560
561  private void addResult(final MutateResponse.Builder builder, final Result result,
562    final HBaseRpcController rpcc, boolean clientCellBlockSupported) {
563    if (result == null) return;
564    if (clientCellBlockSupported) {
565      builder.setResult(ProtobufUtil.toResultNoData(result));
566      rpcc.setCellScanner(result.cellScanner());
567    } else {
568      ClientProtos.Result pbr = ProtobufUtil.toResult(result);
569      builder.setResult(pbr);
570    }
571  }
572
573  private void addResults(ScanResponse.Builder builder, List<Result> results,
574    HBaseRpcController controller, boolean isDefaultRegion, boolean clientCellBlockSupported) {
575    builder.setStale(!isDefaultRegion);
576    if (results.isEmpty()) {
577      return;
578    }
579    if (clientCellBlockSupported) {
580      for (Result res : results) {
581        builder.addCellsPerResult(res.size());
582        builder.addPartialFlagPerResult(res.mayHaveMoreCellsInRow());
583      }
584      controller.setCellScanner(CellUtil.createCellScanner(results));
585    } else {
586      for (Result res : results) {
587        ClientProtos.Result pbr = ProtobufUtil.toResult(res);
588        builder.addResults(pbr);
589      }
590    }
591  }
592
593  private CheckAndMutateResult checkAndMutate(HRegion region, List<ClientProtos.Action> actions,
594    CellScanner cellScanner, Condition condition, long nonceGroup,
595    ActivePolicyEnforcement spaceQuotaEnforcement) throws IOException {
596    int countOfCompleteMutation = 0;
597    try {
598      if (!region.getRegionInfo().isMetaRegion()) {
599        server.getMemStoreFlusher().reclaimMemStoreMemory();
600      }
601      List<Mutation> mutations = new ArrayList<>();
602      long nonce = HConstants.NO_NONCE;
603      for (ClientProtos.Action action : actions) {
604        if (action.hasGet()) {
605          throw new DoNotRetryIOException(
606            "Atomic put and/or delete only, not a Get=" + action.getGet());
607        }
608        MutationProto mutation = action.getMutation();
609        MutationType type = mutation.getMutateType();
610        switch (type) {
611          case PUT:
612            Put put = ProtobufUtil.toPut(mutation, cellScanner);
613            ++countOfCompleteMutation;
614            checkCellSizeLimit(region, put);
615            spaceQuotaEnforcement.getPolicyEnforcement(region).check(put);
616            mutations.add(put);
617            break;
618          case DELETE:
619            Delete del = ProtobufUtil.toDelete(mutation, cellScanner);
620            ++countOfCompleteMutation;
621            spaceQuotaEnforcement.getPolicyEnforcement(region).check(del);
622            mutations.add(del);
623            break;
624          case INCREMENT:
625            Increment increment = ProtobufUtil.toIncrement(mutation, cellScanner);
626            nonce = mutation.hasNonce() ? mutation.getNonce() : HConstants.NO_NONCE;
627            ++countOfCompleteMutation;
628            checkCellSizeLimit(region, increment);
629            spaceQuotaEnforcement.getPolicyEnforcement(region).check(increment);
630            mutations.add(increment);
631            break;
632          case APPEND:
633            Append append = ProtobufUtil.toAppend(mutation, cellScanner);
634            nonce = mutation.hasNonce() ? mutation.getNonce() : HConstants.NO_NONCE;
635            ++countOfCompleteMutation;
636            checkCellSizeLimit(region, append);
637            spaceQuotaEnforcement.getPolicyEnforcement(region).check(append);
638            mutations.add(append);
639            break;
640          default:
641            throw new DoNotRetryIOException("invalid mutation type : " + type);
642        }
643      }
644
645      if (mutations.size() == 0) {
646        return new CheckAndMutateResult(true, null);
647      } else {
648        CheckAndMutate checkAndMutate = ProtobufUtil.toCheckAndMutate(condition, mutations);
649        CheckAndMutateResult result = null;
650        if (region.getCoprocessorHost() != null) {
651          result = region.getCoprocessorHost().preCheckAndMutate(checkAndMutate);
652        }
653        if (result == null) {
654          result = region.checkAndMutate(checkAndMutate, nonceGroup, nonce);
655          if (region.getCoprocessorHost() != null) {
656            result = region.getCoprocessorHost().postCheckAndMutate(checkAndMutate, result);
657          }
658        }
659        return result;
660      }
661    } finally {
662      // Currently, the checkAndMutate isn't supported by batch so it won't mess up the cell scanner
663      // even if the malformed cells are not skipped.
664      for (int i = countOfCompleteMutation; i < actions.size(); ++i) {
665        skipCellsForMutation(actions.get(i), cellScanner);
666      }
667    }
668  }
669
670  /**
671   * Execute an append mutation.
672   * @return result to return to client if default operation should be bypassed as indicated by
673   *         RegionObserver, null otherwise
674   */
675  private Result append(final HRegion region, final OperationQuota quota,
676    final MutationProto mutation, final CellScanner cellScanner, long nonceGroup,
677    ActivePolicyEnforcement spaceQuota, RpcCallContext context) throws IOException {
678    long before = EnvironmentEdgeManager.currentTime();
679    Append append = ProtobufUtil.toAppend(mutation, cellScanner);
680    checkCellSizeLimit(region, append);
681    spaceQuota.getPolicyEnforcement(region).check(append);
682    quota.addMutation(append);
683    long blockBytesScannedBefore = context != null ? context.getBlockBytesScanned() : 0;
684    long nonce = mutation.hasNonce() ? mutation.getNonce() : HConstants.NO_NONCE;
685    Result r = region.append(append, nonceGroup, nonce);
686    if (server.getMetrics() != null) {
687      long blockBytesScanned =
688        context != null ? context.getBlockBytesScanned() - blockBytesScannedBefore : 0;
689      server.getMetrics().updateAppend(region, EnvironmentEdgeManager.currentTime() - before,
690        blockBytesScanned);
691    }
692    return r == null ? Result.EMPTY_RESULT : r;
693  }
694
695  /**
696   * Execute an increment mutation.
697   */
698  private Result increment(final HRegion region, final OperationQuota quota,
699    final MutationProto mutation, final CellScanner cells, long nonceGroup,
700    ActivePolicyEnforcement spaceQuota, RpcCallContext context) throws IOException {
701    long before = EnvironmentEdgeManager.currentTime();
702    Increment increment = ProtobufUtil.toIncrement(mutation, cells);
703    checkCellSizeLimit(region, increment);
704    spaceQuota.getPolicyEnforcement(region).check(increment);
705    quota.addMutation(increment);
706    long blockBytesScannedBefore = context != null ? context.getBlockBytesScanned() : 0;
707    long nonce = mutation.hasNonce() ? mutation.getNonce() : HConstants.NO_NONCE;
708    Result r = region.increment(increment, nonceGroup, nonce);
709    final MetricsRegionServer metricsRegionServer = server.getMetrics();
710    if (metricsRegionServer != null) {
711      long blockBytesScanned =
712        context != null ? context.getBlockBytesScanned() - blockBytesScannedBefore : 0;
713      metricsRegionServer.updateIncrement(region, EnvironmentEdgeManager.currentTime() - before,
714        blockBytesScanned);
715    }
716    return r == null ? Result.EMPTY_RESULT : r;
717  }
718
719  /**
720   * Run through the regionMutation <code>rm</code> and per Mutation, do the work, and then when
721   * done, add an instance of a {@link ResultOrException} that corresponds to each Mutation.
722   * @param cellsToReturn Could be null. May be allocated in this method. This is what this method
723   *                      returns as a 'result'.
724   * @param closeCallBack the callback to be used with multigets
725   * @param context       the current RpcCallContext
726   * @return Return the <code>cellScanner</code> passed
727   */
728  private List<CellScannable> doNonAtomicRegionMutation(final HRegion region,
729    final OperationQuota quota, final RegionAction actions, final CellScanner cellScanner,
730    final RegionActionResult.Builder builder, List<CellScannable> cellsToReturn, long nonceGroup,
731    final RegionScannersCloseCallBack closeCallBack, RpcCallContext context,
732    ActivePolicyEnforcement spaceQuotaEnforcement) {
733    // Gather up CONTIGUOUS Puts and Deletes in this mutations List. Idea is that rather than do
734    // one at a time, we instead pass them in batch. Be aware that the corresponding
735    // ResultOrException instance that matches each Put or Delete is then added down in the
736    // doNonAtomicBatchOp call. We should be staying aligned though the Put and Delete are
737    // deferred/batched
738    List<ClientProtos.Action> mutations = null;
739    long maxQuotaResultSize = Math.min(maxScannerResultSize, quota.getMaxResultSize());
740    IOException sizeIOE = null;
741    ClientProtos.ResultOrException.Builder resultOrExceptionBuilder =
742      ResultOrException.newBuilder();
743    boolean hasResultOrException = false;
744    for (ClientProtos.Action action : actions.getActionList()) {
745      hasResultOrException = false;
746      resultOrExceptionBuilder.clear();
747      try {
748        Result r = null;
749        long blockBytesScannedBefore = context != null ? context.getBlockBytesScanned() : 0;
750        if (
751          context != null && context.isRetryImmediatelySupported()
752            && (context.getResponseCellSize() > maxQuotaResultSize
753              || blockBytesScannedBefore + context.getResponseExceptionSize() > maxQuotaResultSize)
754        ) {
755
756          // We're storing the exception since the exception and reason string won't
757          // change after the response size limit is reached.
758          if (sizeIOE == null) {
759            // We don't need the stack un-winding do don't throw the exception.
760            // Throwing will kill the JVM's JIT.
761            //
762            // Instead just create the exception and then store it.
763            sizeIOE = new MultiActionResultTooLarge("Max size exceeded" + " CellSize: "
764              + context.getResponseCellSize() + " BlockSize: " + blockBytesScannedBefore);
765
766            // Only report the exception once since there's only one request that
767            // caused the exception. Otherwise this number will dominate the exceptions count.
768            rpcServer.getMetrics().exception(sizeIOE);
769          }
770
771          // Now that there's an exception is known to be created
772          // use it for the response.
773          //
774          // This will create a copy in the builder.
775          NameBytesPair pair = ResponseConverter.buildException(sizeIOE);
776          resultOrExceptionBuilder.setException(pair);
777          context.incrementResponseExceptionSize(pair.getSerializedSize());
778          resultOrExceptionBuilder.setIndex(action.getIndex());
779          builder.addResultOrException(resultOrExceptionBuilder.build());
780          skipCellsForMutation(action, cellScanner);
781          continue;
782        }
783        if (action.hasGet()) {
784          long before = EnvironmentEdgeManager.currentTime();
785          ClientProtos.Get pbGet = action.getGet();
786          // An asynchbase client, https://github.com/OpenTSDB/asynchbase, starts by trying to do
787          // a get closest before. Throwing the UnknownProtocolException signals it that it needs
788          // to switch and do hbase2 protocol (HBase servers do not tell clients what versions
789          // they are; its a problem for non-native clients like asynchbase. HBASE-20225.
790          if (pbGet.hasClosestRowBefore() && pbGet.getClosestRowBefore()) {
791            throw new UnknownProtocolException("Is this a pre-hbase-1.0.0 or asynchbase client? "
792              + "Client is invoking getClosestRowBefore removed in hbase-2.0.0 replaced by "
793              + "reverse Scan.");
794          }
795          try {
796            Get get = ProtobufUtil.toGet(pbGet);
797            if (context != null) {
798              r = get(get, (region), closeCallBack, context);
799            } else {
800              r = region.get(get);
801            }
802          } finally {
803            final MetricsRegionServer metricsRegionServer = server.getMetrics();
804            if (metricsRegionServer != null) {
805              long blockBytesScanned =
806                context != null ? context.getBlockBytesScanned() - blockBytesScannedBefore : 0;
807              metricsRegionServer.updateGet(region, EnvironmentEdgeManager.currentTime() - before,
808                blockBytesScanned);
809            }
810          }
811        } else if (action.hasServiceCall()) {
812          hasResultOrException = true;
813          Message result = execServiceOnRegion(region, action.getServiceCall());
814          ClientProtos.CoprocessorServiceResult.Builder serviceResultBuilder =
815            ClientProtos.CoprocessorServiceResult.newBuilder();
816          resultOrExceptionBuilder.setServiceResult(serviceResultBuilder
817            .setValue(serviceResultBuilder.getValueBuilder().setName(result.getClass().getName())
818              // TODO: Copy!!!
819              .setValue(UnsafeByteOperations.unsafeWrap(result.toByteArray()))));
820        } else if (action.hasMutation()) {
821          MutationType type = action.getMutation().getMutateType();
822          if (
823            type != MutationType.PUT && type != MutationType.DELETE && mutations != null
824              && !mutations.isEmpty()
825          ) {
826            // Flush out any Puts or Deletes already collected.
827            doNonAtomicBatchOp(builder, region, quota, mutations, cellScanner,
828              spaceQuotaEnforcement);
829            mutations.clear();
830          }
831          switch (type) {
832            case APPEND:
833              r = append(region, quota, action.getMutation(), cellScanner, nonceGroup,
834                spaceQuotaEnforcement, context);
835              break;
836            case INCREMENT:
837              r = increment(region, quota, action.getMutation(), cellScanner, nonceGroup,
838                spaceQuotaEnforcement, context);
839              break;
840            case PUT:
841            case DELETE:
842              // Collect the individual mutations and apply in a batch
843              if (mutations == null) {
844                mutations = new ArrayList<>(actions.getActionCount());
845              }
846              mutations.add(action);
847              break;
848            default:
849              throw new DoNotRetryIOException("Unsupported mutate type: " + type.name());
850          }
851        } else {
852          throw new HBaseIOException("Unexpected Action type");
853        }
854        if (r != null) {
855          ClientProtos.Result pbResult = null;
856          if (isClientCellBlockSupport(context)) {
857            pbResult = ProtobufUtil.toResultNoData(r);
858            // Hard to guess the size here. Just make a rough guess.
859            if (cellsToReturn == null) {
860              cellsToReturn = new ArrayList<>();
861            }
862            cellsToReturn.add(r);
863          } else {
864            pbResult = ProtobufUtil.toResult(r);
865          }
866          addSize(context, r);
867          hasResultOrException = true;
868          resultOrExceptionBuilder.setResult(pbResult);
869        }
870        // Could get to here and there was no result and no exception. Presumes we added
871        // a Put or Delete to the collecting Mutations List for adding later. In this
872        // case the corresponding ResultOrException instance for the Put or Delete will be added
873        // down in the doNonAtomicBatchOp method call rather than up here.
874      } catch (IOException ie) {
875        rpcServer.getMetrics().exception(ie);
876        hasResultOrException = true;
877        NameBytesPair pair = ResponseConverter.buildException(ie);
878        resultOrExceptionBuilder.setException(pair);
879        context.incrementResponseExceptionSize(pair.getSerializedSize());
880      }
881      if (hasResultOrException) {
882        // Propagate index.
883        resultOrExceptionBuilder.setIndex(action.getIndex());
884        builder.addResultOrException(resultOrExceptionBuilder.build());
885      }
886    }
887    // Finish up any outstanding mutations
888    if (!CollectionUtils.isEmpty(mutations)) {
889      doNonAtomicBatchOp(builder, region, quota, mutations, cellScanner, spaceQuotaEnforcement);
890    }
891    return cellsToReturn;
892  }
893
894  private void checkCellSizeLimit(final HRegion r, final Mutation m) throws IOException {
895    if (r.maxCellSize > 0) {
896      CellScanner cells = m.cellScanner();
897      while (cells.advance()) {
898        int size = PrivateCellUtil.estimatedSerializedSizeOf(cells.current());
899        if (size > r.maxCellSize) {
900          String msg = "Cell[" + cells.current() + "] with size " + size + " exceeds limit of "
901            + r.maxCellSize + " bytes";
902          LOG.debug(msg);
903          throw new DoNotRetryIOException(msg);
904        }
905      }
906    }
907  }
908
909  private void doAtomicBatchOp(final RegionActionResult.Builder builder, final HRegion region,
910    final OperationQuota quota, final List<ClientProtos.Action> mutations, final CellScanner cells,
911    long nonceGroup, ActivePolicyEnforcement spaceQuotaEnforcement) throws IOException {
912    // Just throw the exception. The exception will be caught and then added to region-level
913    // exception for RegionAction. Leaving the null to action result is ok since the null
914    // result is viewed as failure by hbase client. And the region-lever exception will be used
915    // to replaced the null result. see AsyncRequestFutureImpl#receiveMultiAction and
916    // AsyncBatchRpcRetryingCaller#onComplete for more details.
917    doBatchOp(builder, region, quota, mutations, cells, nonceGroup, spaceQuotaEnforcement, true);
918  }
919
920  private void doNonAtomicBatchOp(final RegionActionResult.Builder builder, final HRegion region,
921    final OperationQuota quota, final List<ClientProtos.Action> mutations, final CellScanner cells,
922    ActivePolicyEnforcement spaceQuotaEnforcement) {
923    try {
924      doBatchOp(builder, region, quota, mutations, cells, HConstants.NO_NONCE,
925        spaceQuotaEnforcement, false);
926    } catch (IOException e) {
927      // Set the exception for each action. The mutations in same RegionAction are group to
928      // different batch and then be processed individually. Hence, we don't set the region-level
929      // exception here for whole RegionAction.
930      for (Action mutation : mutations) {
931        builder.addResultOrException(getResultOrException(e, mutation.getIndex()));
932      }
933    }
934  }
935
936  /**
937   * Execute a list of mutations.
938   */
939  private void doBatchOp(final RegionActionResult.Builder builder, final HRegion region,
940    final OperationQuota quota, final List<ClientProtos.Action> mutations, final CellScanner cells,
941    long nonceGroup, ActivePolicyEnforcement spaceQuotaEnforcement, boolean atomic)
942    throws IOException {
943    Mutation[] mArray = new Mutation[mutations.size()];
944    long before = EnvironmentEdgeManager.currentTime();
945    boolean batchContainsPuts = false, batchContainsDelete = false;
946    try {
947      /**
948       * HBASE-17924 mutationActionMap is a map to map the relation between mutations and actions
949       * since mutation array may have been reoredered.In order to return the right result or
950       * exception to the corresponding actions, We need to know which action is the mutation belong
951       * to. We can't sort ClientProtos.Action array, since they are bonded to cellscanners.
952       */
953      Map<Mutation, ClientProtos.Action> mutationActionMap = new HashMap<>();
954      int i = 0;
955      long nonce = HConstants.NO_NONCE;
956      for (ClientProtos.Action action : mutations) {
957        if (action.hasGet()) {
958          throw new DoNotRetryIOException(
959            "Atomic put and/or delete only, not a Get=" + action.getGet());
960        }
961        MutationProto m = action.getMutation();
962        Mutation mutation;
963        switch (m.getMutateType()) {
964          case PUT:
965            mutation = ProtobufUtil.toPut(m, cells);
966            batchContainsPuts = true;
967            break;
968
969          case DELETE:
970            mutation = ProtobufUtil.toDelete(m, cells);
971            batchContainsDelete = true;
972            break;
973
974          case INCREMENT:
975            mutation = ProtobufUtil.toIncrement(m, cells);
976            nonce = m.hasNonce() ? m.getNonce() : HConstants.NO_NONCE;
977            break;
978
979          case APPEND:
980            mutation = ProtobufUtil.toAppend(m, cells);
981            nonce = m.hasNonce() ? m.getNonce() : HConstants.NO_NONCE;
982            break;
983
984          default:
985            throw new DoNotRetryIOException("Invalid mutation type : " + m.getMutateType());
986        }
987        mutationActionMap.put(mutation, action);
988        mArray[i++] = mutation;
989        checkCellSizeLimit(region, mutation);
990        // Check if a space quota disallows this mutation
991        spaceQuotaEnforcement.getPolicyEnforcement(region).check(mutation);
992        quota.addMutation(mutation);
993      }
994
995      if (!region.getRegionInfo().isMetaRegion()) {
996        server.getMemStoreFlusher().reclaimMemStoreMemory();
997      }
998
999      // HBASE-17924
1000      // Sort to improve lock efficiency for non-atomic batch of operations. If atomic
1001      // order is preserved as its expected from the client
1002      if (!atomic) {
1003        Arrays.sort(mArray, (v1, v2) -> Row.COMPARATOR.compare(v1, v2));
1004      }
1005
1006      OperationStatus[] codes = region.batchMutate(mArray, atomic, nonceGroup, nonce);
1007
1008      // When atomic is true, it indicates that the mutateRow API or the batch API with
1009      // RowMutations is called. In this case, we need to merge the results of the
1010      // Increment/Append operations if the mutations include those operations, and set the merged
1011      // result to the first element of the ResultOrException list
1012      if (atomic) {
1013        List<ResultOrException> resultOrExceptions = new ArrayList<>();
1014        List<Result> results = new ArrayList<>();
1015        for (i = 0; i < codes.length; i++) {
1016          if (codes[i].getResult() != null) {
1017            results.add(codes[i].getResult());
1018          }
1019          if (i != 0) {
1020            resultOrExceptions
1021              .add(getResultOrException(ClientProtos.Result.getDefaultInstance(), i));
1022          }
1023        }
1024
1025        if (results.isEmpty()) {
1026          builder.addResultOrException(
1027            getResultOrException(ClientProtos.Result.getDefaultInstance(), 0));
1028        } else {
1029          // Merge the results of the Increment/Append operations
1030          List<Cell> cellList = new ArrayList<>();
1031          for (Result result : results) {
1032            if (result.rawCells() != null) {
1033              cellList.addAll(Arrays.asList(result.rawCells()));
1034            }
1035          }
1036          Result result = Result.create(cellList);
1037
1038          // Set the merged result of the Increment/Append operations to the first element of the
1039          // ResultOrException list
1040          builder.addResultOrException(getResultOrException(ProtobufUtil.toResult(result), 0));
1041        }
1042
1043        builder.addAllResultOrException(resultOrExceptions);
1044        return;
1045      }
1046
1047      for (i = 0; i < codes.length; i++) {
1048        Mutation currentMutation = mArray[i];
1049        ClientProtos.Action currentAction = mutationActionMap.get(currentMutation);
1050        int index = currentAction.hasIndex() ? currentAction.getIndex() : i;
1051        Exception e;
1052        switch (codes[i].getOperationStatusCode()) {
1053          case BAD_FAMILY:
1054            e = new NoSuchColumnFamilyException(codes[i].getExceptionMsg());
1055            builder.addResultOrException(getResultOrException(e, index));
1056            break;
1057
1058          case SANITY_CHECK_FAILURE:
1059            e = new FailedSanityCheckException(codes[i].getExceptionMsg());
1060            builder.addResultOrException(getResultOrException(e, index));
1061            break;
1062
1063          default:
1064            e = new DoNotRetryIOException(codes[i].getExceptionMsg());
1065            builder.addResultOrException(getResultOrException(e, index));
1066            break;
1067
1068          case SUCCESS:
1069            ClientProtos.Result result = codes[i].getResult() == null
1070              ? ClientProtos.Result.getDefaultInstance()
1071              : ProtobufUtil.toResult(codes[i].getResult());
1072            builder.addResultOrException(getResultOrException(result, index));
1073            break;
1074
1075          case STORE_TOO_BUSY:
1076            e = new RegionTooBusyException(codes[i].getExceptionMsg());
1077            builder.addResultOrException(getResultOrException(e, index));
1078            break;
1079        }
1080      }
1081    } finally {
1082      int processedMutationIndex = 0;
1083      for (Action mutation : mutations) {
1084        // The non-null mArray[i] means the cell scanner has been read.
1085        if (mArray[processedMutationIndex++] == null) {
1086          skipCellsForMutation(mutation, cells);
1087        }
1088      }
1089      updateMutationMetrics(region, before, batchContainsPuts, batchContainsDelete);
1090    }
1091  }
1092
1093  private void updateMutationMetrics(HRegion region, long starttime, boolean batchContainsPuts,
1094    boolean batchContainsDelete) {
1095    final MetricsRegionServer metricsRegionServer = server.getMetrics();
1096    if (metricsRegionServer != null) {
1097      long after = EnvironmentEdgeManager.currentTime();
1098      if (batchContainsPuts) {
1099        metricsRegionServer.updatePutBatch(region, after - starttime);
1100      }
1101      if (batchContainsDelete) {
1102        metricsRegionServer.updateDeleteBatch(region, after - starttime);
1103      }
1104    }
1105  }
1106
1107  /**
1108   * Execute a list of Put/Delete mutations. The function returns OperationStatus instead of
1109   * constructing MultiResponse to save a possible loop if caller doesn't need MultiResponse.
1110   * @return an array of OperationStatus which internally contains the OperationStatusCode and the
1111   *         exceptionMessage if any
1112   * @deprecated Since 3.0.0, will be removed in 4.0.0. We do not use this method for replaying
1113   *             edits for secondary replicas any more, see
1114   *             {@link #replicateToReplica(RpcController, ReplicateWALEntryRequest)}.
1115   */
1116  @Deprecated
1117  private OperationStatus[] doReplayBatchOp(final HRegion region,
1118    final List<MutationReplay> mutations, long replaySeqId) throws IOException {
1119    long before = EnvironmentEdgeManager.currentTime();
1120    boolean batchContainsPuts = false, batchContainsDelete = false;
1121    try {
1122      for (Iterator<MutationReplay> it = mutations.iterator(); it.hasNext();) {
1123        MutationReplay m = it.next();
1124
1125        if (m.getType() == MutationType.PUT) {
1126          batchContainsPuts = true;
1127        } else {
1128          batchContainsDelete = true;
1129        }
1130
1131        NavigableMap<byte[], List<Cell>> map = m.mutation.getFamilyCellMap();
1132        List<Cell> metaCells = map.get(WALEdit.METAFAMILY);
1133        if (metaCells != null && !metaCells.isEmpty()) {
1134          for (Cell metaCell : metaCells) {
1135            CompactionDescriptor compactionDesc = WALEdit.getCompaction(metaCell);
1136            boolean isDefaultReplica = RegionReplicaUtil.isDefaultReplica(region.getRegionInfo());
1137            HRegion hRegion = region;
1138            if (compactionDesc != null) {
1139              // replay the compaction. Remove the files from stores only if we are the primary
1140              // region replica (thus own the files)
1141              hRegion.replayWALCompactionMarker(compactionDesc, !isDefaultReplica, isDefaultReplica,
1142                replaySeqId);
1143              continue;
1144            }
1145            FlushDescriptor flushDesc = WALEdit.getFlushDescriptor(metaCell);
1146            if (flushDesc != null && !isDefaultReplica) {
1147              hRegion.replayWALFlushMarker(flushDesc, replaySeqId);
1148              continue;
1149            }
1150            RegionEventDescriptor regionEvent = WALEdit.getRegionEventDescriptor(metaCell);
1151            if (regionEvent != null && !isDefaultReplica) {
1152              hRegion.replayWALRegionEventMarker(regionEvent);
1153              continue;
1154            }
1155            BulkLoadDescriptor bulkLoadEvent = WALEdit.getBulkLoadDescriptor(metaCell);
1156            if (bulkLoadEvent != null) {
1157              hRegion.replayWALBulkLoadEventMarker(bulkLoadEvent);
1158              continue;
1159            }
1160          }
1161          it.remove();
1162        }
1163      }
1164      requestCount.increment();
1165      if (!region.getRegionInfo().isMetaRegion()) {
1166        server.getMemStoreFlusher().reclaimMemStoreMemory();
1167      }
1168      return region.batchReplay(mutations.toArray(new MutationReplay[mutations.size()]),
1169        replaySeqId);
1170    } finally {
1171      updateMutationMetrics(region, before, batchContainsPuts, batchContainsDelete);
1172    }
1173  }
1174
1175  private void closeAllScanners() {
1176    // Close any outstanding scanners. Means they'll get an UnknownScanner
1177    // exception next time they come in.
1178    for (Map.Entry<String, RegionScannerHolder> e : scanners.entrySet()) {
1179      try {
1180        e.getValue().s.close();
1181      } catch (IOException ioe) {
1182        LOG.warn("Closing scanner " + e.getKey(), ioe);
1183      }
1184    }
1185  }
1186
1187  // Directly invoked only for testing
1188  public RSRpcServices(final HRegionServer rs) throws IOException {
1189    super(rs, rs.getProcessName());
1190    final Configuration conf = rs.getConfiguration();
1191    setReloadableGuardrails(conf);
1192    scannerLeaseTimeoutPeriod = conf.getInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD,
1193      HConstants.DEFAULT_HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD);
1194    rpcTimeout =
1195      conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY, HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
1196    minimumScanTimeLimitDelta = conf.getLong(REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA,
1197      DEFAULT_REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA);
1198    rpcServer.setNamedQueueRecorder(rs.getNamedQueueRecorder());
1199    closedScanners = CacheBuilder.newBuilder()
1200      .expireAfterAccess(scannerLeaseTimeoutPeriod, TimeUnit.MILLISECONDS).build();
1201  }
1202
1203  @Override
1204  protected boolean defaultReservoirEnabled() {
1205    return true;
1206  }
1207
1208  @Override
1209  protected ServerType getDNSServerType() {
1210    return DNS.ServerType.REGIONSERVER;
1211  }
1212
1213  @Override
1214  protected String getHostname(Configuration conf, String defaultHostname) {
1215    return conf.get("hbase.regionserver.ipc.address", defaultHostname);
1216  }
1217
1218  @Override
1219  protected String getPortConfigName() {
1220    return HConstants.REGIONSERVER_PORT;
1221  }
1222
1223  @Override
1224  protected int getDefaultPort() {
1225    return HConstants.DEFAULT_REGIONSERVER_PORT;
1226  }
1227
1228  @Override
1229  protected Class<?> getRpcSchedulerFactoryClass(Configuration conf) {
1230    return conf.getClass(REGION_SERVER_RPC_SCHEDULER_FACTORY_CLASS,
1231      SimpleRpcSchedulerFactory.class);
1232  }
1233
1234  protected RpcServerInterface createRpcServer(final Server server,
1235    final RpcSchedulerFactory rpcSchedulerFactory, final InetSocketAddress bindAddress,
1236    final String name) throws IOException {
1237    final Configuration conf = server.getConfiguration();
1238    boolean reservoirEnabled = conf.getBoolean(ByteBuffAllocator.ALLOCATOR_POOL_ENABLED_KEY, true);
1239    try {
1240      return RpcServerFactory.createRpcServer(server, name, getServices(), bindAddress, // use final
1241                                                                                        // bindAddress
1242                                                                                        // for this
1243                                                                                        // server.
1244        conf, rpcSchedulerFactory.create(conf, this, server), reservoirEnabled);
1245    } catch (BindException be) {
1246      throw new IOException(be.getMessage() + ". To switch ports use the '"
1247        + HConstants.REGIONSERVER_PORT + "' configuration property.",
1248        be.getCause() != null ? be.getCause() : be);
1249    }
1250  }
1251
1252  protected Class<?> getRpcSchedulerFactoryClass() {
1253    final Configuration conf = server.getConfiguration();
1254    return conf.getClass(REGION_SERVER_RPC_SCHEDULER_FACTORY_CLASS,
1255      SimpleRpcSchedulerFactory.class);
1256  }
1257
1258  protected PriorityFunction createPriority() {
1259    return new RSAnnotationReadingPriorityFunction(this);
1260  }
1261
1262  public int getScannersCount() {
1263    return scanners.size();
1264  }
1265
1266  /** Returns The outstanding RegionScanner for <code>scannerId</code> or null if none found. */
1267  RegionScanner getScanner(long scannerId) {
1268    RegionScannerHolder rsh = getRegionScannerHolder(scannerId);
1269    return rsh == null ? null : rsh.s;
1270  }
1271
1272  /** Returns The associated RegionScannerHolder for <code>scannerId</code> or null. */
1273  private RegionScannerHolder getRegionScannerHolder(long scannerId) {
1274    return scanners.get(toScannerName(scannerId));
1275  }
1276
1277  public String getScanDetailsWithId(long scannerId) {
1278    RegionScanner scanner = getScanner(scannerId);
1279    if (scanner == null) {
1280      return null;
1281    }
1282    StringBuilder builder = new StringBuilder();
1283    builder.append("table: ").append(scanner.getRegionInfo().getTable().getNameAsString());
1284    builder.append(" region: ").append(scanner.getRegionInfo().getRegionNameAsString());
1285    builder.append(" operation_id: ").append(scanner.getOperationId());
1286    return builder.toString();
1287  }
1288
1289  public String getScanDetailsWithRequest(ScanRequest request) {
1290    try {
1291      if (!request.hasRegion()) {
1292        return null;
1293      }
1294      Region region = getRegion(request.getRegion());
1295      StringBuilder builder = new StringBuilder();
1296      builder.append("table: ").append(region.getRegionInfo().getTable().getNameAsString());
1297      builder.append(" region: ").append(region.getRegionInfo().getRegionNameAsString());
1298      for (NameBytesPair pair : request.getScan().getAttributeList()) {
1299        if (OperationWithAttributes.ID_ATRIBUTE.equals(pair.getName())) {
1300          builder.append(" operation_id: ").append(Bytes.toString(pair.getValue().toByteArray()));
1301          break;
1302        }
1303      }
1304      return builder.toString();
1305    } catch (IOException ignored) {
1306      return null;
1307    }
1308  }
1309
1310  /**
1311   * Get the vtime associated with the scanner. Currently the vtime is the number of "next" calls.
1312   */
1313  long getScannerVirtualTime(long scannerId) {
1314    RegionScannerHolder rsh = getRegionScannerHolder(scannerId);
1315    return rsh == null ? 0L : rsh.getNextCallSeq();
1316  }
1317
1318  /**
1319   * Method to account for the size of retained cells.
1320   * @param context rpc call context
1321   * @param r       result to add size.
1322   * @return an object that represents the last referenced block from this response.
1323   */
1324  void addSize(RpcCallContext context, Result r) {
1325    if (context != null && r != null && !r.isEmpty()) {
1326      for (Cell c : r.rawCells()) {
1327        context.incrementResponseCellSize(PrivateCellUtil.estimatedSerializedSizeOf(c));
1328      }
1329    }
1330  }
1331
1332  /** Returns Remote client's ip and port else null if can't be determined. */
1333  @RestrictedApi(explanation = "Should only be called in TestRSRpcServices and RSRpcServices",
1334      link = "", allowedOnPath = ".*(TestRSRpcServices|RSRpcServices).java")
1335  static String getRemoteClientIpAndPort() {
1336    RpcCall rpcCall = RpcServer.getCurrentCall().orElse(null);
1337    if (rpcCall == null) {
1338      return HConstants.EMPTY_STRING;
1339    }
1340    InetAddress address = rpcCall.getRemoteAddress();
1341    if (address == null) {
1342      return HConstants.EMPTY_STRING;
1343    }
1344    // Be careful here with InetAddress. Do InetAddress#getHostAddress. It will not do a name
1345    // resolution. Just use the IP. It is generally a smaller amount of info to keep around while
1346    // scanning than a hostname anyways.
1347    return Address.fromParts(address.getHostAddress(), rpcCall.getRemotePort()).toString();
1348  }
1349
1350  /** Returns Remote client's username. */
1351  @RestrictedApi(explanation = "Should only be called in TestRSRpcServices and RSRpcServices",
1352      link = "", allowedOnPath = ".*(TestRSRpcServices|RSRpcServices).java")
1353  static String getUserName() {
1354    RpcCall rpcCall = RpcServer.getCurrentCall().orElse(null);
1355    if (rpcCall == null) {
1356      return HConstants.EMPTY_STRING;
1357    }
1358    return rpcCall.getRequestUserName().orElse(HConstants.EMPTY_STRING);
1359  }
1360
1361  private RegionScannerHolder addScanner(String scannerName, RegionScanner s, Shipper shipper,
1362    HRegion r, boolean needCursor, boolean fullRegionScan) throws LeaseStillHeldException {
1363    Lease lease = server.getLeaseManager().createLease(scannerName, this.scannerLeaseTimeoutPeriod,
1364      new ScannerListener(scannerName));
1365    RpcCallback shippedCallback = new RegionScannerShippedCallBack(scannerName, shipper, lease);
1366    RpcCallback closeCallback =
1367      s instanceof RpcCallback ? (RpcCallback) s : new RegionScannerCloseCallBack(s);
1368    RegionScannerHolder rsh = new RegionScannerHolder(s, r, closeCallback, shippedCallback,
1369      needCursor, fullRegionScan, getRemoteClientIpAndPort(), getUserName());
1370    RegionScannerHolder existing = scanners.putIfAbsent(scannerName, rsh);
1371    assert existing == null : "scannerId must be unique within regionserver's whole lifecycle! "
1372      + scannerName + ", " + existing;
1373    return rsh;
1374  }
1375
1376  private boolean isFullRegionScan(Scan scan, HRegion region) {
1377    // If the scan start row equals or less than the start key of the region
1378    // and stop row greater than equals end key (if stop row present)
1379    // or if the stop row is empty
1380    // account this as a full region scan
1381    if (
1382      Bytes.compareTo(scan.getStartRow(), region.getRegionInfo().getStartKey()) <= 0
1383        && (Bytes.compareTo(scan.getStopRow(), region.getRegionInfo().getEndKey()) >= 0
1384          && !Bytes.equals(region.getRegionInfo().getEndKey(), HConstants.EMPTY_END_ROW)
1385          || Bytes.equals(scan.getStopRow(), HConstants.EMPTY_END_ROW))
1386    ) {
1387      return true;
1388    }
1389    return false;
1390  }
1391
1392  /**
1393   * Find the HRegion based on a region specifier
1394   * @param regionSpecifier the region specifier
1395   * @return the corresponding region
1396   * @throws IOException if the specifier is not null, but failed to find the region
1397   */
1398  public HRegion getRegion(final RegionSpecifier regionSpecifier) throws IOException {
1399    return server.getRegion(regionSpecifier.getValue().toByteArray());
1400  }
1401
1402  /**
1403   * Find the List of HRegions based on a list of region specifiers
1404   * @param regionSpecifiers the list of region specifiers
1405   * @return the corresponding list of regions
1406   * @throws IOException if any of the specifiers is not null, but failed to find the region
1407   */
1408  private List<HRegion> getRegions(final List<RegionSpecifier> regionSpecifiers,
1409    final CacheEvictionStatsBuilder stats) {
1410    List<HRegion> regions = Lists.newArrayListWithCapacity(regionSpecifiers.size());
1411    for (RegionSpecifier regionSpecifier : regionSpecifiers) {
1412      try {
1413        regions.add(server.getRegion(regionSpecifier.getValue().toByteArray()));
1414      } catch (NotServingRegionException e) {
1415        stats.addException(regionSpecifier.getValue().toByteArray(), e);
1416      }
1417    }
1418    return regions;
1419  }
1420
1421  PriorityFunction getPriority() {
1422    return priority;
1423  }
1424
1425  private RegionServerRpcQuotaManager getRpcQuotaManager() {
1426    return server.getRegionServerRpcQuotaManager();
1427  }
1428
1429  private RegionServerSpaceQuotaManager getSpaceQuotaManager() {
1430    return server.getRegionServerSpaceQuotaManager();
1431  }
1432
1433  void start(ZKWatcher zkWatcher) {
1434    this.scannerIdGenerator = new ScannerIdGenerator(this.server.getServerName());
1435    internalStart(zkWatcher);
1436  }
1437
1438  void stop() {
1439    closeAllScanners();
1440    internalStop();
1441  }
1442
1443  /**
1444   * Called to verify that this server is up and running.
1445   */
1446  // TODO : Rename this and HMaster#checkInitialized to isRunning() (or a better name).
1447  protected void checkOpen() throws IOException {
1448    if (server.isAborted()) {
1449      throw new RegionServerAbortedException("Server " + server.getServerName() + " aborting");
1450    }
1451    if (server.isStopped()) {
1452      throw new RegionServerStoppedException("Server " + server.getServerName() + " stopping");
1453    }
1454    if (!server.isDataFileSystemOk()) {
1455      throw new RegionServerStoppedException("File system not available");
1456    }
1457    if (!server.isOnline()) {
1458      throw new ServerNotRunningYetException(
1459        "Server " + server.getServerName() + " is not running yet");
1460    }
1461  }
1462
1463  /**
1464   * By default, put up an Admin and a Client Service. Set booleans
1465   * <code>hbase.regionserver.admin.executorService</code> and
1466   * <code>hbase.regionserver.client.executorService</code> if you want to enable/disable services.
1467   * Default is that both are enabled.
1468   * @return immutable list of blocking services and the security info classes that this server
1469   *         supports
1470   */
1471  protected List<BlockingServiceAndInterface> getServices() {
1472    boolean admin = getConfiguration().getBoolean(REGIONSERVER_ADMIN_SERVICE_CONFIG, true);
1473    boolean client = getConfiguration().getBoolean(REGIONSERVER_CLIENT_SERVICE_CONFIG, true);
1474    boolean clientMeta =
1475      getConfiguration().getBoolean(REGIONSERVER_CLIENT_META_SERVICE_CONFIG, true);
1476    boolean bootstrapNodes =
1477      getConfiguration().getBoolean(REGIONSERVER_BOOTSTRAP_NODES_SERVICE_CONFIG, true);
1478    List<BlockingServiceAndInterface> bssi = new ArrayList<>();
1479    if (client) {
1480      bssi.add(new BlockingServiceAndInterface(ClientService.newReflectiveBlockingService(this),
1481        ClientService.BlockingInterface.class));
1482    }
1483    if (admin) {
1484      bssi.add(new BlockingServiceAndInterface(AdminService.newReflectiveBlockingService(this),
1485        AdminService.BlockingInterface.class));
1486    }
1487    if (clientMeta) {
1488      bssi.add(new BlockingServiceAndInterface(ClientMetaService.newReflectiveBlockingService(this),
1489        ClientMetaService.BlockingInterface.class));
1490    }
1491    if (bootstrapNodes) {
1492      bssi.add(
1493        new BlockingServiceAndInterface(BootstrapNodeService.newReflectiveBlockingService(this),
1494          BootstrapNodeService.BlockingInterface.class));
1495    }
1496    return new ImmutableList.Builder<BlockingServiceAndInterface>().addAll(bssi).build();
1497  }
1498
1499  /**
1500   * Close a region on the region server.
1501   * @param controller the RPC controller
1502   * @param request    the request
1503   */
1504  @Override
1505  @QosPriority(priority = HConstants.ADMIN_QOS)
1506  public CloseRegionResponse closeRegion(final RpcController controller,
1507    final CloseRegionRequest request) throws ServiceException {
1508    final ServerName sn = (request.hasDestinationServer()
1509      ? ProtobufUtil.toServerName(request.getDestinationServer())
1510      : null);
1511
1512    try {
1513      checkOpen();
1514      throwOnWrongStartCode(request);
1515      final String encodedRegionName = ProtobufUtil.getRegionEncodedName(request.getRegion());
1516
1517      requestCount.increment();
1518      if (sn == null) {
1519        LOG.info("Close " + encodedRegionName + " without moving");
1520      } else {
1521        LOG.info("Close " + encodedRegionName + ", moving to " + sn);
1522      }
1523      boolean closed = server.closeRegion(encodedRegionName, false, sn);
1524      CloseRegionResponse.Builder builder = CloseRegionResponse.newBuilder().setClosed(closed);
1525      return builder.build();
1526    } catch (IOException ie) {
1527      throw new ServiceException(ie);
1528    }
1529  }
1530
1531  /**
1532   * Compact a region on the region server.
1533   * @param controller the RPC controller
1534   * @param request    the request
1535   */
1536  @Override
1537  @QosPriority(priority = HConstants.ADMIN_QOS)
1538  public CompactRegionResponse compactRegion(final RpcController controller,
1539    final CompactRegionRequest request) throws ServiceException {
1540    try {
1541      checkOpen();
1542      requestCount.increment();
1543      HRegion region = getRegion(request.getRegion());
1544      // Quota support is enabled, the requesting user is not system/super user
1545      // and a quota policy is enforced that disables compactions.
1546      if (
1547        QuotaUtil.isQuotaEnabled(getConfiguration())
1548          && !Superusers.isSuperUser(RpcServer.getRequestUser().orElse(null))
1549          && this.server.getRegionServerSpaceQuotaManager()
1550            .areCompactionsDisabled(region.getTableDescriptor().getTableName())
1551      ) {
1552        throw new DoNotRetryIOException(
1553          "Compactions on this region are " + "disabled due to a space quota violation.");
1554      }
1555      region.startRegionOperation(Operation.COMPACT_REGION);
1556      LOG.info("Compacting " + region.getRegionInfo().getRegionNameAsString());
1557      boolean major = request.hasMajor() && request.getMajor();
1558      if (request.hasFamily()) {
1559        byte[] family = request.getFamily().toByteArray();
1560        String log = "User-triggered " + (major ? "major " : "") + "compaction for region "
1561          + region.getRegionInfo().getRegionNameAsString() + " and family "
1562          + Bytes.toString(family);
1563        LOG.trace(log);
1564        region.requestCompaction(family, log, Store.PRIORITY_USER, major,
1565          CompactionLifeCycleTracker.DUMMY);
1566      } else {
1567        String log = "User-triggered " + (major ? "major " : "") + "compaction for region "
1568          + region.getRegionInfo().getRegionNameAsString();
1569        LOG.trace(log);
1570        region.requestCompaction(log, Store.PRIORITY_USER, major, CompactionLifeCycleTracker.DUMMY);
1571      }
1572      return CompactRegionResponse.newBuilder().build();
1573    } catch (IOException ie) {
1574      throw new ServiceException(ie);
1575    }
1576  }
1577
1578  @Override
1579  @QosPriority(priority = HConstants.ADMIN_QOS)
1580  public CompactionSwitchResponse compactionSwitch(RpcController controller,
1581    CompactionSwitchRequest request) throws ServiceException {
1582    rpcPreCheck("compactionSwitch");
1583    final CompactSplit compactSplitThread = server.getCompactSplitThread();
1584    requestCount.increment();
1585    boolean prevState = compactSplitThread.isCompactionsEnabled();
1586    CompactionSwitchResponse response =
1587      CompactionSwitchResponse.newBuilder().setPrevState(prevState).build();
1588    if (prevState == request.getEnabled()) {
1589      // passed in requested state is same as current state. No action required
1590      return response;
1591    }
1592    compactSplitThread.switchCompaction(request.getEnabled());
1593    return response;
1594  }
1595
1596  /**
1597   * Flush a region on the region server.
1598   * @param controller the RPC controller
1599   * @param request    the request
1600   */
1601  @Override
1602  @QosPriority(priority = HConstants.ADMIN_QOS)
1603  public FlushRegionResponse flushRegion(final RpcController controller,
1604    final FlushRegionRequest request) throws ServiceException {
1605    try {
1606      checkOpen();
1607      requestCount.increment();
1608      HRegion region = getRegion(request.getRegion());
1609      LOG.info("Flushing " + region.getRegionInfo().getRegionNameAsString());
1610      boolean shouldFlush = true;
1611      if (request.hasIfOlderThanTs()) {
1612        shouldFlush = region.getEarliestFlushTimeForAllStores() < request.getIfOlderThanTs();
1613      }
1614      FlushRegionResponse.Builder builder = FlushRegionResponse.newBuilder();
1615      if (shouldFlush) {
1616        boolean writeFlushWalMarker =
1617          request.hasWriteFlushWalMarker() ? request.getWriteFlushWalMarker() : false;
1618        // Go behind the curtain so we can manage writing of the flush WAL marker
1619        HRegion.FlushResultImpl flushResult = null;
1620        if (request.hasFamily()) {
1621          List families = new ArrayList();
1622          families.add(request.getFamily().toByteArray());
1623          flushResult =
1624            region.flushcache(families, writeFlushWalMarker, FlushLifeCycleTracker.DUMMY);
1625        } else {
1626          flushResult = region.flushcache(true, writeFlushWalMarker, FlushLifeCycleTracker.DUMMY);
1627        }
1628        boolean compactionNeeded = flushResult.isCompactionNeeded();
1629        if (compactionNeeded) {
1630          server.getCompactSplitThread().requestSystemCompaction(region,
1631            "Compaction through user triggered flush");
1632        }
1633        builder.setFlushed(flushResult.isFlushSucceeded());
1634        builder.setWroteFlushWalMarker(flushResult.wroteFlushWalMarker);
1635      }
1636      builder.setLastFlushTime(region.getEarliestFlushTimeForAllStores());
1637      return builder.build();
1638    } catch (DroppedSnapshotException ex) {
1639      // Cache flush can fail in a few places. If it fails in a critical
1640      // section, we get a DroppedSnapshotException and a replay of wal
1641      // is required. Currently the only way to do this is a restart of
1642      // the server.
1643      server.abort("Replay of WAL required. Forcing server shutdown", ex);
1644      throw new ServiceException(ex);
1645    } catch (IOException ie) {
1646      throw new ServiceException(ie);
1647    }
1648  }
1649
1650  @Override
1651  @QosPriority(priority = HConstants.ADMIN_QOS)
1652  public GetOnlineRegionResponse getOnlineRegion(final RpcController controller,
1653    final GetOnlineRegionRequest request) throws ServiceException {
1654    try {
1655      checkOpen();
1656      requestCount.increment();
1657      Map<String, HRegion> onlineRegions = server.getOnlineRegions();
1658      List<RegionInfo> list = new ArrayList<>(onlineRegions.size());
1659      for (HRegion region : onlineRegions.values()) {
1660        list.add(region.getRegionInfo());
1661      }
1662      list.sort(RegionInfo.COMPARATOR);
1663      return ResponseConverter.buildGetOnlineRegionResponse(list);
1664    } catch (IOException ie) {
1665      throw new ServiceException(ie);
1666    }
1667  }
1668
1669  // Master implementation of this Admin Service differs given it is not
1670  // able to supply detail only known to RegionServer. See note on
1671  // MasterRpcServers#getRegionInfo.
1672  @Override
1673  @QosPriority(priority = HConstants.ADMIN_QOS)
1674  public GetRegionInfoResponse getRegionInfo(final RpcController controller,
1675    final GetRegionInfoRequest request) throws ServiceException {
1676    try {
1677      checkOpen();
1678      requestCount.increment();
1679      HRegion region = getRegion(request.getRegion());
1680      RegionInfo info = region.getRegionInfo();
1681      byte[] bestSplitRow;
1682      if (request.hasBestSplitRow() && request.getBestSplitRow()) {
1683        bestSplitRow = region.checkSplit(true).orElse(null);
1684        // when all table data are in memstore, bestSplitRow = null
1685        // try to flush region first
1686        if (bestSplitRow == null) {
1687          region.flush(true);
1688          bestSplitRow = region.checkSplit(true).orElse(null);
1689        }
1690      } else {
1691        bestSplitRow = null;
1692      }
1693      GetRegionInfoResponse.Builder builder = GetRegionInfoResponse.newBuilder();
1694      builder.setRegionInfo(ProtobufUtil.toRegionInfo(info));
1695      if (request.hasCompactionState() && request.getCompactionState()) {
1696        builder.setCompactionState(ProtobufUtil.createCompactionState(region.getCompactionState()));
1697      }
1698      builder.setSplittable(region.isSplittable());
1699      builder.setMergeable(region.isMergeable());
1700      if (request.hasBestSplitRow() && request.getBestSplitRow() && bestSplitRow != null) {
1701        builder.setBestSplitRow(UnsafeByteOperations.unsafeWrap(bestSplitRow));
1702      }
1703      return builder.build();
1704    } catch (IOException ie) {
1705      throw new ServiceException(ie);
1706    }
1707  }
1708
1709  @Override
1710  @QosPriority(priority = HConstants.ADMIN_QOS)
1711  public GetRegionLoadResponse getRegionLoad(RpcController controller, GetRegionLoadRequest request)
1712    throws ServiceException {
1713
1714    List<HRegion> regions;
1715    if (request.hasTableName()) {
1716      TableName tableName = ProtobufUtil.toTableName(request.getTableName());
1717      regions = server.getRegions(tableName);
1718    } else {
1719      regions = server.getRegions();
1720    }
1721    List<RegionLoad> rLoads = new ArrayList<>(regions.size());
1722    RegionLoad.Builder regionLoadBuilder = ClusterStatusProtos.RegionLoad.newBuilder();
1723    RegionSpecifier.Builder regionSpecifier = RegionSpecifier.newBuilder();
1724
1725    try {
1726      for (HRegion region : regions) {
1727        rLoads.add(server.createRegionLoad(region, regionLoadBuilder, regionSpecifier));
1728      }
1729    } catch (IOException e) {
1730      throw new ServiceException(e);
1731    }
1732    GetRegionLoadResponse.Builder builder = GetRegionLoadResponse.newBuilder();
1733    builder.addAllRegionLoads(rLoads);
1734    return builder.build();
1735  }
1736
1737  @Override
1738  @QosPriority(priority = HConstants.ADMIN_QOS)
1739  public ClearCompactionQueuesResponse clearCompactionQueues(RpcController controller,
1740    ClearCompactionQueuesRequest request) throws ServiceException {
1741    LOG.debug("Client=" + RpcServer.getRequestUserName().orElse(null) + "/"
1742      + RpcServer.getRemoteAddress().orElse(null) + " clear compactions queue");
1743    ClearCompactionQueuesResponse.Builder respBuilder = ClearCompactionQueuesResponse.newBuilder();
1744    requestCount.increment();
1745    if (clearCompactionQueues.compareAndSet(false, true)) {
1746      final CompactSplit compactSplitThread = server.getCompactSplitThread();
1747      try {
1748        checkOpen();
1749        server.getRegionServerCoprocessorHost().preClearCompactionQueues();
1750        for (String queueName : request.getQueueNameList()) {
1751          LOG.debug("clear " + queueName + " compaction queue");
1752          switch (queueName) {
1753            case "long":
1754              compactSplitThread.clearLongCompactionsQueue();
1755              break;
1756            case "short":
1757              compactSplitThread.clearShortCompactionsQueue();
1758              break;
1759            default:
1760              LOG.warn("Unknown queue name " + queueName);
1761              throw new IOException("Unknown queue name " + queueName);
1762          }
1763        }
1764        server.getRegionServerCoprocessorHost().postClearCompactionQueues();
1765      } catch (IOException ie) {
1766        throw new ServiceException(ie);
1767      } finally {
1768        clearCompactionQueues.set(false);
1769      }
1770    } else {
1771      LOG.warn("Clear compactions queue is executing by other admin.");
1772    }
1773    return respBuilder.build();
1774  }
1775
1776  /**
1777   * Get some information of the region server.
1778   * @param controller the RPC controller
1779   * @param request    the request
1780   */
1781  @Override
1782  @QosPriority(priority = HConstants.ADMIN_QOS)
1783  public GetServerInfoResponse getServerInfo(final RpcController controller,
1784    final GetServerInfoRequest request) throws ServiceException {
1785    try {
1786      checkOpen();
1787    } catch (IOException ie) {
1788      throw new ServiceException(ie);
1789    }
1790    requestCount.increment();
1791    int infoPort = server.getInfoServer() != null ? server.getInfoServer().getPort() : -1;
1792    return ResponseConverter.buildGetServerInfoResponse(server.getServerName(), infoPort);
1793  }
1794
1795  @Override
1796  @QosPriority(priority = HConstants.ADMIN_QOS)
1797  public GetStoreFileResponse getStoreFile(final RpcController controller,
1798    final GetStoreFileRequest request) throws ServiceException {
1799    try {
1800      checkOpen();
1801      HRegion region = getRegion(request.getRegion());
1802      requestCount.increment();
1803      Set<byte[]> columnFamilies;
1804      if (request.getFamilyCount() == 0) {
1805        columnFamilies = region.getTableDescriptor().getColumnFamilyNames();
1806      } else {
1807        columnFamilies = new TreeSet<>(Bytes.BYTES_RAWCOMPARATOR);
1808        for (ByteString cf : request.getFamilyList()) {
1809          columnFamilies.add(cf.toByteArray());
1810        }
1811      }
1812      int nCF = columnFamilies.size();
1813      List<String> fileList = region.getStoreFileList(columnFamilies.toArray(new byte[nCF][]));
1814      GetStoreFileResponse.Builder builder = GetStoreFileResponse.newBuilder();
1815      builder.addAllStoreFile(fileList);
1816      return builder.build();
1817    } catch (IOException ie) {
1818      throw new ServiceException(ie);
1819    }
1820  }
1821
1822  private void throwOnWrongStartCode(OpenRegionRequest request) throws ServiceException {
1823    if (!request.hasServerStartCode()) {
1824      LOG.warn("OpenRegionRequest for {} does not have a start code", request.getOpenInfoList());
1825      return;
1826    }
1827    throwOnWrongStartCode(request.getServerStartCode());
1828  }
1829
1830  private void throwOnWrongStartCode(CloseRegionRequest request) throws ServiceException {
1831    if (!request.hasServerStartCode()) {
1832      LOG.warn("CloseRegionRequest for {} does not have a start code", request.getRegion());
1833      return;
1834    }
1835    throwOnWrongStartCode(request.getServerStartCode());
1836  }
1837
1838  private void throwOnWrongStartCode(long serverStartCode) throws ServiceException {
1839    // check that we are the same server that this RPC is intended for.
1840    if (server.getServerName().getStartcode() != serverStartCode) {
1841      throw new ServiceException(new DoNotRetryIOException(
1842        "This RPC was intended for a " + "different server with startCode: " + serverStartCode
1843          + ", this server is: " + server.getServerName()));
1844    }
1845  }
1846
1847  private void throwOnWrongStartCode(ExecuteProceduresRequest req) throws ServiceException {
1848    if (req.getOpenRegionCount() > 0) {
1849      for (OpenRegionRequest openReq : req.getOpenRegionList()) {
1850        throwOnWrongStartCode(openReq);
1851      }
1852    }
1853    if (req.getCloseRegionCount() > 0) {
1854      for (CloseRegionRequest closeReq : req.getCloseRegionList()) {
1855        throwOnWrongStartCode(closeReq);
1856      }
1857    }
1858  }
1859
1860  /**
1861   * Open asynchronously a region or a set of regions on the region server. The opening is
1862   * coordinated by ZooKeeper, and this method requires the znode to be created before being called.
1863   * As a consequence, this method should be called only from the master.
1864   * <p>
1865   * Different manages states for the region are:
1866   * </p>
1867   * <ul>
1868   * <li>region not opened: the region opening will start asynchronously.</li>
1869   * <li>a close is already in progress: this is considered as an error.</li>
1870   * <li>an open is already in progress: this new open request will be ignored. This is important
1871   * because the Master can do multiple requests if it crashes.</li>
1872   * <li>the region is already opened: this new open request will be ignored.</li>
1873   * </ul>
1874   * <p>
1875   * Bulk assign: If there are more than 1 region to open, it will be considered as a bulk assign.
1876   * For a single region opening, errors are sent through a ServiceException. For bulk assign,
1877   * errors are put in the response as FAILED_OPENING.
1878   * </p>
1879   * @param controller the RPC controller
1880   * @param request    the request
1881   */
1882  @Override
1883  @QosPriority(priority = HConstants.ADMIN_QOS)
1884  public OpenRegionResponse openRegion(final RpcController controller,
1885    final OpenRegionRequest request) throws ServiceException {
1886    requestCount.increment();
1887    throwOnWrongStartCode(request);
1888
1889    OpenRegionResponse.Builder builder = OpenRegionResponse.newBuilder();
1890    final int regionCount = request.getOpenInfoCount();
1891    final Map<TableName, TableDescriptor> htds = new HashMap<>(regionCount);
1892    final boolean isBulkAssign = regionCount > 1;
1893    try {
1894      checkOpen();
1895    } catch (IOException ie) {
1896      TableName tableName = null;
1897      if (regionCount == 1) {
1898        org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionInfo ri =
1899          request.getOpenInfo(0).getRegion();
1900        if (ri != null) {
1901          tableName = ProtobufUtil.toTableName(ri.getTableName());
1902        }
1903      }
1904      if (!TableName.META_TABLE_NAME.equals(tableName)) {
1905        throw new ServiceException(ie);
1906      }
1907      // We are assigning meta, wait a little for regionserver to finish initialization.
1908      // Default to quarter of RPC timeout
1909      int timeout = server.getConfiguration().getInt(HConstants.HBASE_RPC_TIMEOUT_KEY,
1910        HConstants.DEFAULT_HBASE_RPC_TIMEOUT) >> 2;
1911      long endTime = EnvironmentEdgeManager.currentTime() + timeout;
1912      synchronized (server.online) {
1913        try {
1914          while (
1915            EnvironmentEdgeManager.currentTime() <= endTime && !server.isStopped()
1916              && !server.isOnline()
1917          ) {
1918            server.online.wait(server.getMsgInterval());
1919          }
1920          checkOpen();
1921        } catch (InterruptedException t) {
1922          Thread.currentThread().interrupt();
1923          throw new ServiceException(t);
1924        } catch (IOException e) {
1925          throw new ServiceException(e);
1926        }
1927      }
1928    }
1929
1930    long masterSystemTime = request.hasMasterSystemTime() ? request.getMasterSystemTime() : -1;
1931
1932    for (RegionOpenInfo regionOpenInfo : request.getOpenInfoList()) {
1933      final RegionInfo region = ProtobufUtil.toRegionInfo(regionOpenInfo.getRegion());
1934      TableDescriptor htd;
1935      try {
1936        String encodedName = region.getEncodedName();
1937        byte[] encodedNameBytes = region.getEncodedNameAsBytes();
1938        final HRegion onlineRegion = server.getRegion(encodedName);
1939        if (onlineRegion != null) {
1940          // The region is already online. This should not happen any more.
1941          String error = "Received OPEN for the region:" + region.getRegionNameAsString()
1942            + ", which is already online";
1943          LOG.warn(error);
1944          // server.abort(error);
1945          // throw new IOException(error);
1946          builder.addOpeningState(RegionOpeningState.OPENED);
1947          continue;
1948        }
1949        LOG.info("Open " + region.getRegionNameAsString());
1950
1951        final Boolean previous =
1952          server.getRegionsInTransitionInRS().putIfAbsent(encodedNameBytes, Boolean.TRUE);
1953
1954        if (Boolean.FALSE.equals(previous)) {
1955          if (server.getRegion(encodedName) != null) {
1956            // There is a close in progress. This should not happen any more.
1957            String error = "Received OPEN for the region:" + region.getRegionNameAsString()
1958              + ", which we are already trying to CLOSE";
1959            server.abort(error);
1960            throw new IOException(error);
1961          }
1962          server.getRegionsInTransitionInRS().put(encodedNameBytes, Boolean.TRUE);
1963        }
1964
1965        if (Boolean.TRUE.equals(previous)) {
1966          // An open is in progress. This is supported, but let's log this.
1967          LOG.info("Receiving OPEN for the region:" + region.getRegionNameAsString()
1968            + ", which we are already trying to OPEN"
1969            + " - ignoring this new request for this region.");
1970        }
1971
1972        // We are opening this region. If it moves back and forth for whatever reason, we don't
1973        // want to keep returning the stale moved record while we are opening/if we close again.
1974        server.removeFromMovedRegions(region.getEncodedName());
1975
1976        if (previous == null || !previous.booleanValue()) {
1977          htd = htds.get(region.getTable());
1978          if (htd == null) {
1979            htd = server.getTableDescriptors().get(region.getTable());
1980            htds.put(region.getTable(), htd);
1981          }
1982          if (htd == null) {
1983            throw new IOException("Missing table descriptor for " + region.getEncodedName());
1984          }
1985          // If there is no action in progress, we can submit a specific handler.
1986          // Need to pass the expected version in the constructor.
1987          if (server.getExecutorService() == null) {
1988            LOG.info("No executor executorService; skipping open request");
1989          } else {
1990            if (region.isMetaRegion()) {
1991              server.getExecutorService()
1992                .submit(new OpenMetaHandler(server, server, region, htd, masterSystemTime));
1993            } else {
1994              if (regionOpenInfo.getFavoredNodesCount() > 0) {
1995                server.updateRegionFavoredNodesMapping(region.getEncodedName(),
1996                  regionOpenInfo.getFavoredNodesList());
1997              }
1998              if (htd.getPriority() >= HConstants.ADMIN_QOS || region.getTable().isSystemTable()) {
1999                server.getExecutorService().submit(
2000                  new OpenPriorityRegionHandler(server, server, region, htd, masterSystemTime));
2001              } else {
2002                server.getExecutorService()
2003                  .submit(new OpenRegionHandler(server, server, region, htd, masterSystemTime));
2004              }
2005            }
2006          }
2007        }
2008
2009        builder.addOpeningState(RegionOpeningState.OPENED);
2010      } catch (IOException ie) {
2011        LOG.warn("Failed opening region " + region.getRegionNameAsString(), ie);
2012        if (isBulkAssign) {
2013          builder.addOpeningState(RegionOpeningState.FAILED_OPENING);
2014        } else {
2015          throw new ServiceException(ie);
2016        }
2017      }
2018    }
2019    return builder.build();
2020  }
2021
2022  /**
2023   * Warmup a region on this server. This method should only be called by Master. It synchronously
2024   * opens the region and closes the region bringing the most important pages in cache.
2025   */
2026  @Override
2027  public WarmupRegionResponse warmupRegion(final RpcController controller,
2028    final WarmupRegionRequest request) throws ServiceException {
2029    final RegionInfo region = ProtobufUtil.toRegionInfo(request.getRegionInfo());
2030    WarmupRegionResponse response = WarmupRegionResponse.getDefaultInstance();
2031    try {
2032      checkOpen();
2033      String encodedName = region.getEncodedName();
2034      byte[] encodedNameBytes = region.getEncodedNameAsBytes();
2035      final HRegion onlineRegion = server.getRegion(encodedName);
2036      if (onlineRegion != null) {
2037        LOG.info("{} is online; skipping warmup", region);
2038        return response;
2039      }
2040      TableDescriptor htd = server.getTableDescriptors().get(region.getTable());
2041      if (server.getRegionsInTransitionInRS().containsKey(encodedNameBytes)) {
2042        LOG.info("{} is in transition; skipping warmup", region);
2043        return response;
2044      }
2045      LOG.info("Warmup {}", region.getRegionNameAsString());
2046      HRegion.warmupHRegion(region, htd, server.getWAL(region), server.getConfiguration(), server,
2047        null);
2048    } catch (IOException ie) {
2049      LOG.error("Failed warmup of {}", region.getRegionNameAsString(), ie);
2050      throw new ServiceException(ie);
2051    }
2052
2053    return response;
2054  }
2055
2056  private CellScanner getAndReset(RpcController controller) {
2057    HBaseRpcController hrc = (HBaseRpcController) controller;
2058    CellScanner cells = hrc.cellScanner();
2059    hrc.setCellScanner(null);
2060    return cells;
2061  }
2062
2063  /**
2064   * Replay the given changes when distributedLogReplay WAL edits from a failed RS. The guarantee is
2065   * that the given mutations will be durable on the receiving RS if this method returns without any
2066   * exception.
2067   * @param controller the RPC controller
2068   * @param request    the request
2069   * @deprecated Since 3.0.0, will be removed in 4.0.0. Not used any more, put here only for
2070   *             compatibility with old region replica implementation. Now we will use
2071   *             {@code replicateToReplica} method instead.
2072   */
2073  @Deprecated
2074  @Override
2075  @QosPriority(priority = HConstants.REPLAY_QOS)
2076  public ReplicateWALEntryResponse replay(final RpcController controller,
2077    final ReplicateWALEntryRequest request) throws ServiceException {
2078    long before = EnvironmentEdgeManager.currentTime();
2079    CellScanner cells = getAndReset(controller);
2080    try {
2081      checkOpen();
2082      List<WALEntry> entries = request.getEntryList();
2083      if (entries == null || entries.isEmpty()) {
2084        // empty input
2085        return ReplicateWALEntryResponse.newBuilder().build();
2086      }
2087      ByteString regionName = entries.get(0).getKey().getEncodedRegionName();
2088      HRegion region = server.getRegionByEncodedName(regionName.toStringUtf8());
2089      RegionCoprocessorHost coprocessorHost =
2090        ServerRegionReplicaUtil.isDefaultReplica(region.getRegionInfo())
2091          ? region.getCoprocessorHost()
2092          : null; // do not invoke coprocessors if this is a secondary region replica
2093
2094      // Skip adding the edits to WAL if this is a secondary region replica
2095      boolean isPrimary = RegionReplicaUtil.isDefaultReplica(region.getRegionInfo());
2096      Durability durability = isPrimary ? Durability.USE_DEFAULT : Durability.SKIP_WAL;
2097
2098      for (WALEntry entry : entries) {
2099        if (!regionName.equals(entry.getKey().getEncodedRegionName())) {
2100          throw new NotServingRegionException("Replay request contains entries from multiple "
2101            + "regions. First region:" + regionName.toStringUtf8() + " , other region:"
2102            + entry.getKey().getEncodedRegionName());
2103        }
2104        if (server.nonceManager != null && isPrimary) {
2105          long nonceGroup =
2106            entry.getKey().hasNonceGroup() ? entry.getKey().getNonceGroup() : HConstants.NO_NONCE;
2107          long nonce = entry.getKey().hasNonce() ? entry.getKey().getNonce() : HConstants.NO_NONCE;
2108          server.nonceManager.reportOperationFromWal(nonceGroup, nonce,
2109            entry.getKey().getWriteTime());
2110        }
2111        Pair<WALKey, WALEdit> walEntry = (coprocessorHost == null) ? null : new Pair<>();
2112        List<MutationReplay> edits =
2113          WALSplitUtil.getMutationsFromWALEntry(entry, cells, walEntry, durability);
2114        if (edits != null && !edits.isEmpty()) {
2115          // HBASE-17924
2116          // sort to improve lock efficiency
2117          Collections.sort(edits, (v1, v2) -> Row.COMPARATOR.compare(v1.mutation, v2.mutation));
2118          long replaySeqId = (entry.getKey().hasOrigSequenceNumber())
2119            ? entry.getKey().getOrigSequenceNumber()
2120            : entry.getKey().getLogSequenceNumber();
2121          OperationStatus[] result = doReplayBatchOp(region, edits, replaySeqId);
2122          // check if it's a partial success
2123          for (int i = 0; result != null && i < result.length; i++) {
2124            if (result[i] != OperationStatus.SUCCESS) {
2125              throw new IOException(result[i].getExceptionMsg());
2126            }
2127          }
2128        }
2129      }
2130
2131      // sync wal at the end because ASYNC_WAL is used above
2132      WAL wal = region.getWAL();
2133      if (wal != null) {
2134        wal.sync();
2135      }
2136      return ReplicateWALEntryResponse.newBuilder().build();
2137    } catch (IOException ie) {
2138      throw new ServiceException(ie);
2139    } finally {
2140      final MetricsRegionServer metricsRegionServer = server.getMetrics();
2141      if (metricsRegionServer != null) {
2142        metricsRegionServer.updateReplay(EnvironmentEdgeManager.currentTime() - before);
2143      }
2144    }
2145  }
2146
2147  /**
2148   * Replay the given changes on a secondary replica
2149   */
2150  @Override
2151  public ReplicateWALEntryResponse replicateToReplica(RpcController controller,
2152    ReplicateWALEntryRequest request) throws ServiceException {
2153    CellScanner cells = getAndReset(controller);
2154    try {
2155      checkOpen();
2156      List<WALEntry> entries = request.getEntryList();
2157      if (entries == null || entries.isEmpty()) {
2158        // empty input
2159        return ReplicateWALEntryResponse.newBuilder().build();
2160      }
2161      ByteString regionName = entries.get(0).getKey().getEncodedRegionName();
2162      HRegion region = server.getRegionByEncodedName(regionName.toStringUtf8());
2163      if (RegionReplicaUtil.isDefaultReplica(region.getRegionInfo())) {
2164        throw new DoNotRetryIOException(
2165          "Should not replicate to primary replica " + region.getRegionInfo() + ", CODE BUG?");
2166      }
2167      for (WALEntry entry : entries) {
2168        if (!regionName.equals(entry.getKey().getEncodedRegionName())) {
2169          throw new NotServingRegionException(
2170            "ReplicateToReplica request contains entries from multiple " + "regions. First region:"
2171              + regionName.toStringUtf8() + " , other region:"
2172              + entry.getKey().getEncodedRegionName());
2173        }
2174        region.replayWALEntry(entry, cells);
2175      }
2176      return ReplicateWALEntryResponse.newBuilder().build();
2177    } catch (IOException ie) {
2178      throw new ServiceException(ie);
2179    }
2180  }
2181
2182  private void checkShouldRejectReplicationRequest(List<WALEntry> entries) throws IOException {
2183    ReplicationSourceService replicationSource = server.getReplicationSourceService();
2184    if (replicationSource == null || entries.isEmpty()) {
2185      return;
2186    }
2187    // We can ensure that all entries are for one peer, so only need to check one entry's
2188    // table name. if the table hit sync replication at peer side and the peer cluster
2189    // is (or is transiting to) state ACTIVE or DOWNGRADE_ACTIVE, we should reject to apply
2190    // those entries according to the design doc.
2191    TableName table = TableName.valueOf(entries.get(0).getKey().getTableName().toByteArray());
2192    if (
2193      replicationSource.getSyncReplicationPeerInfoProvider().checkState(table,
2194        RejectReplicationRequestStateChecker.get())
2195    ) {
2196      throw new DoNotRetryIOException(
2197        "Reject to apply to sink cluster because sync replication state of sink cluster "
2198          + "is ACTIVE or DOWNGRADE_ACTIVE, table: " + table);
2199    }
2200  }
2201
2202  /**
2203   * Replicate WAL entries on the region server.
2204   * @param controller the RPC controller
2205   * @param request    the request
2206   */
2207  @Override
2208  @QosPriority(priority = HConstants.REPLICATION_QOS)
2209  public ReplicateWALEntryResponse replicateWALEntry(final RpcController controller,
2210    final ReplicateWALEntryRequest request) throws ServiceException {
2211    try {
2212      checkOpen();
2213      if (server.getReplicationSinkService() != null) {
2214        requestCount.increment();
2215        List<WALEntry> entries = request.getEntryList();
2216        checkShouldRejectReplicationRequest(entries);
2217        CellScanner cellScanner = getAndReset(controller);
2218        server.getRegionServerCoprocessorHost().preReplicateLogEntries();
2219        server.getReplicationSinkService().replicateLogEntries(entries, cellScanner,
2220          request.getReplicationClusterId(), request.getSourceBaseNamespaceDirPath(),
2221          request.getSourceHFileArchiveDirPath());
2222        server.getRegionServerCoprocessorHost().postReplicateLogEntries();
2223        return ReplicateWALEntryResponse.newBuilder().build();
2224      } else {
2225        throw new ServiceException("Replication services are not initialized yet");
2226      }
2227    } catch (IOException ie) {
2228      throw new ServiceException(ie);
2229    }
2230  }
2231
2232  /**
2233   * Roll the WAL writer of the region server.
2234   * @param controller the RPC controller
2235   * @param request    the request
2236   */
2237  @Override
2238  @QosPriority(priority = HConstants.ADMIN_QOS)
2239  public RollWALWriterResponse rollWALWriter(final RpcController controller,
2240    final RollWALWriterRequest request) throws ServiceException {
2241    try {
2242      checkOpen();
2243      requestCount.increment();
2244      server.getRegionServerCoprocessorHost().preRollWALWriterRequest();
2245      server.getWalRoller().requestRollAll();
2246      server.getRegionServerCoprocessorHost().postRollWALWriterRequest();
2247      RollWALWriterResponse.Builder builder = RollWALWriterResponse.newBuilder();
2248      return builder.build();
2249    } catch (IOException ie) {
2250      throw new ServiceException(ie);
2251    }
2252  }
2253
2254  /**
2255   * Stop the region server.
2256   * @param controller the RPC controller
2257   * @param request    the request
2258   */
2259  @Override
2260  @QosPriority(priority = HConstants.ADMIN_QOS)
2261  public StopServerResponse stopServer(final RpcController controller,
2262    final StopServerRequest request) throws ServiceException {
2263    rpcPreCheck("stopServer");
2264    requestCount.increment();
2265    String reason = request.getReason();
2266    server.stop(reason);
2267    return StopServerResponse.newBuilder().build();
2268  }
2269
2270  @Override
2271  public UpdateFavoredNodesResponse updateFavoredNodes(RpcController controller,
2272    UpdateFavoredNodesRequest request) throws ServiceException {
2273    rpcPreCheck("updateFavoredNodes");
2274    List<UpdateFavoredNodesRequest.RegionUpdateInfo> openInfoList = request.getUpdateInfoList();
2275    UpdateFavoredNodesResponse.Builder respBuilder = UpdateFavoredNodesResponse.newBuilder();
2276    for (UpdateFavoredNodesRequest.RegionUpdateInfo regionUpdateInfo : openInfoList) {
2277      RegionInfo hri = ProtobufUtil.toRegionInfo(regionUpdateInfo.getRegion());
2278      if (regionUpdateInfo.getFavoredNodesCount() > 0) {
2279        server.updateRegionFavoredNodesMapping(hri.getEncodedName(),
2280          regionUpdateInfo.getFavoredNodesList());
2281      }
2282    }
2283    respBuilder.setResponse(openInfoList.size());
2284    return respBuilder.build();
2285  }
2286
2287  /**
2288   * Atomically bulk load several HFiles into an open region
2289   * @return true if successful, false is failed but recoverably (no action)
2290   * @throws ServiceException if failed unrecoverably
2291   */
2292  @Override
2293  public BulkLoadHFileResponse bulkLoadHFile(final RpcController controller,
2294    final BulkLoadHFileRequest request) throws ServiceException {
2295    long start = EnvironmentEdgeManager.currentTime();
2296    List<String> clusterIds = new ArrayList<>(request.getClusterIdsList());
2297    if (clusterIds.contains(this.server.getClusterId())) {
2298      return BulkLoadHFileResponse.newBuilder().setLoaded(true).build();
2299    } else {
2300      clusterIds.add(this.server.getClusterId());
2301    }
2302    try {
2303      checkOpen();
2304      requestCount.increment();
2305      HRegion region = getRegion(request.getRegion());
2306      final boolean spaceQuotaEnabled = QuotaUtil.isQuotaEnabled(getConfiguration());
2307      long sizeToBeLoaded = -1;
2308
2309      // Check to see if this bulk load would exceed the space quota for this table
2310      if (spaceQuotaEnabled) {
2311        ActivePolicyEnforcement activeSpaceQuotas = getSpaceQuotaManager().getActiveEnforcements();
2312        SpaceViolationPolicyEnforcement enforcement =
2313          activeSpaceQuotas.getPolicyEnforcement(region);
2314        if (enforcement != null) {
2315          // Bulk loads must still be atomic. We must enact all or none.
2316          List<String> filePaths = new ArrayList<>(request.getFamilyPathCount());
2317          for (FamilyPath familyPath : request.getFamilyPathList()) {
2318            filePaths.add(familyPath.getPath());
2319          }
2320          // Check if the batch of files exceeds the current quota
2321          sizeToBeLoaded = enforcement.computeBulkLoadSize(getFileSystem(filePaths), filePaths);
2322        }
2323      }
2324      // secure bulk load
2325      Map<byte[], List<Path>> map =
2326        server.getSecureBulkLoadManager().secureBulkLoadHFiles(region, request, clusterIds);
2327      BulkLoadHFileResponse.Builder builder = BulkLoadHFileResponse.newBuilder();
2328      builder.setLoaded(map != null);
2329      if (map != null) {
2330        // Treat any negative size as a flag to "ignore" updating the region size as that is
2331        // not possible to occur in real life (cannot bulk load a file with negative size)
2332        if (spaceQuotaEnabled && sizeToBeLoaded > 0) {
2333          if (LOG.isTraceEnabled()) {
2334            LOG.trace("Incrementing space use of " + region.getRegionInfo() + " by "
2335              + sizeToBeLoaded + " bytes");
2336          }
2337          // Inform space quotas of the new files for this region
2338          getSpaceQuotaManager().getRegionSizeStore().incrementRegionSize(region.getRegionInfo(),
2339            sizeToBeLoaded);
2340        }
2341      }
2342      return builder.build();
2343    } catch (IOException ie) {
2344      throw new ServiceException(ie);
2345    } finally {
2346      final MetricsRegionServer metricsRegionServer = server.getMetrics();
2347      if (metricsRegionServer != null) {
2348        metricsRegionServer.updateBulkLoad(EnvironmentEdgeManager.currentTime() - start);
2349      }
2350    }
2351  }
2352
2353  @Override
2354  public PrepareBulkLoadResponse prepareBulkLoad(RpcController controller,
2355    PrepareBulkLoadRequest request) throws ServiceException {
2356    try {
2357      checkOpen();
2358      requestCount.increment();
2359
2360      HRegion region = getRegion(request.getRegion());
2361
2362      String bulkToken = server.getSecureBulkLoadManager().prepareBulkLoad(region, request);
2363      PrepareBulkLoadResponse.Builder builder = PrepareBulkLoadResponse.newBuilder();
2364      builder.setBulkToken(bulkToken);
2365      return builder.build();
2366    } catch (IOException ie) {
2367      throw new ServiceException(ie);
2368    }
2369  }
2370
2371  @Override
2372  public CleanupBulkLoadResponse cleanupBulkLoad(RpcController controller,
2373    CleanupBulkLoadRequest request) throws ServiceException {
2374    try {
2375      checkOpen();
2376      requestCount.increment();
2377
2378      HRegion region = getRegion(request.getRegion());
2379
2380      server.getSecureBulkLoadManager().cleanupBulkLoad(region, request);
2381      return CleanupBulkLoadResponse.newBuilder().build();
2382    } catch (IOException ie) {
2383      throw new ServiceException(ie);
2384    }
2385  }
2386
2387  @Override
2388  public CoprocessorServiceResponse execService(final RpcController controller,
2389    final CoprocessorServiceRequest request) throws ServiceException {
2390    try {
2391      checkOpen();
2392      requestCount.increment();
2393      HRegion region = getRegion(request.getRegion());
2394      Message result = execServiceOnRegion(region, request.getCall());
2395      CoprocessorServiceResponse.Builder builder = CoprocessorServiceResponse.newBuilder();
2396      builder.setRegion(RequestConverter.buildRegionSpecifier(RegionSpecifierType.REGION_NAME,
2397        region.getRegionInfo().getRegionName()));
2398      // TODO: COPIES!!!!!!
2399      builder.setValue(builder.getValueBuilder().setName(result.getClass().getName()).setValue(
2400        org.apache.hbase.thirdparty.com.google.protobuf.ByteString.copyFrom(result.toByteArray())));
2401      return builder.build();
2402    } catch (IOException ie) {
2403      throw new ServiceException(ie);
2404    }
2405  }
2406
2407  private FileSystem getFileSystem(List<String> filePaths) throws IOException {
2408    if (filePaths.isEmpty()) {
2409      // local hdfs
2410      return server.getFileSystem();
2411    }
2412    // source hdfs
2413    return new Path(filePaths.get(0)).getFileSystem(server.getConfiguration());
2414  }
2415
2416  private Message execServiceOnRegion(HRegion region,
2417    final ClientProtos.CoprocessorServiceCall serviceCall) throws IOException {
2418    // ignore the passed in controller (from the serialized call)
2419    ServerRpcController execController = new ServerRpcController();
2420    return region.execService(execController, serviceCall);
2421  }
2422
2423  private boolean shouldRejectRequestsFromClient(HRegion region) {
2424    TableName table = region.getRegionInfo().getTable();
2425    ReplicationSourceService service = server.getReplicationSourceService();
2426    return service != null && service.getSyncReplicationPeerInfoProvider().checkState(table,
2427      RejectRequestsFromClientStateChecker.get());
2428  }
2429
2430  private void rejectIfInStandByState(HRegion region) throws DoNotRetryIOException {
2431    if (shouldRejectRequestsFromClient(region)) {
2432      throw new DoNotRetryIOException(
2433        region.getRegionInfo().getRegionNameAsString() + " is in STANDBY state.");
2434    }
2435  }
2436
2437  /**
2438   * Get data from a table.
2439   * @param controller the RPC controller
2440   * @param request    the get request
2441   */
2442  @Override
2443  public GetResponse get(final RpcController controller, final GetRequest request)
2444    throws ServiceException {
2445    long before = EnvironmentEdgeManager.currentTime();
2446    OperationQuota quota = null;
2447    HRegion region = null;
2448    RpcCallContext context = RpcServer.getCurrentCall().orElse(null);
2449    try {
2450      checkOpen();
2451      requestCount.increment();
2452      rpcGetRequestCount.increment();
2453      region = getRegion(request.getRegion());
2454      rejectIfInStandByState(region);
2455
2456      GetResponse.Builder builder = GetResponse.newBuilder();
2457      ClientProtos.Get get = request.getGet();
2458      // An asynchbase client, https://github.com/OpenTSDB/asynchbase, starts by trying to do
2459      // a get closest before. Throwing the UnknownProtocolException signals it that it needs
2460      // to switch and do hbase2 protocol (HBase servers do not tell clients what versions
2461      // they are; its a problem for non-native clients like asynchbase. HBASE-20225.
2462      if (get.hasClosestRowBefore() && get.getClosestRowBefore()) {
2463        throw new UnknownProtocolException("Is this a pre-hbase-1.0.0 or asynchbase client? "
2464          + "Client is invoking getClosestRowBefore removed in hbase-2.0.0 replaced by "
2465          + "reverse Scan.");
2466      }
2467      Boolean existence = null;
2468      Result r = null;
2469      quota = getRpcQuotaManager().checkBatchQuota(region, OperationQuota.OperationType.GET);
2470
2471      Get clientGet = ProtobufUtil.toGet(get);
2472      if (get.getExistenceOnly() && region.getCoprocessorHost() != null) {
2473        existence = region.getCoprocessorHost().preExists(clientGet);
2474      }
2475      if (existence == null) {
2476        if (context != null) {
2477          r = get(clientGet, (region), null, context);
2478        } else {
2479          // for test purpose
2480          r = region.get(clientGet);
2481        }
2482        if (get.getExistenceOnly()) {
2483          boolean exists = r.getExists();
2484          if (region.getCoprocessorHost() != null) {
2485            exists = region.getCoprocessorHost().postExists(clientGet, exists);
2486          }
2487          existence = exists;
2488        }
2489      }
2490      if (existence != null) {
2491        ClientProtos.Result pbr =
2492          ProtobufUtil.toResult(existence, region.getRegionInfo().getReplicaId() != 0);
2493        builder.setResult(pbr);
2494      } else if (r != null) {
2495        ClientProtos.Result pbr;
2496        if (
2497          isClientCellBlockSupport(context) && controller instanceof HBaseRpcController
2498            && VersionInfoUtil.hasMinimumVersion(context.getClientVersionInfo(), 1, 3)
2499        ) {
2500          pbr = ProtobufUtil.toResultNoData(r);
2501          ((HBaseRpcController) controller)
2502            .setCellScanner(CellUtil.createCellScanner(r.rawCells()));
2503          addSize(context, r);
2504        } else {
2505          pbr = ProtobufUtil.toResult(r);
2506        }
2507        builder.setResult(pbr);
2508      }
2509      // r.cells is null when an table.exists(get) call
2510      if (r != null && r.rawCells() != null) {
2511        quota.addGetResult(r);
2512      }
2513      return builder.build();
2514    } catch (IOException ie) {
2515      throw new ServiceException(ie);
2516    } finally {
2517      final MetricsRegionServer metricsRegionServer = server.getMetrics();
2518      if (metricsRegionServer != null && region != null) {
2519        long blockBytesScanned = context != null ? context.getBlockBytesScanned() : 0;
2520        metricsRegionServer.updateGet(region, EnvironmentEdgeManager.currentTime() - before,
2521          blockBytesScanned);
2522      }
2523      if (quota != null) {
2524        quota.close();
2525      }
2526    }
2527  }
2528
2529  private Result get(Get get, HRegion region, RegionScannersCloseCallBack closeCallBack,
2530    RpcCallContext context) throws IOException {
2531    region.prepareGet(get);
2532    boolean stale = region.getRegionInfo().getReplicaId() != 0;
2533
2534    // This method is almost the same as HRegion#get.
2535    List<Cell> results = new ArrayList<>();
2536    // pre-get CP hook
2537    if (region.getCoprocessorHost() != null) {
2538      if (region.getCoprocessorHost().preGet(get, results)) {
2539        region.metricsUpdateForGet();
2540        return Result.create(results, get.isCheckExistenceOnly() ? !results.isEmpty() : null,
2541          stale);
2542      }
2543    }
2544    Scan scan = new Scan(get);
2545    if (scan.getLoadColumnFamiliesOnDemandValue() == null) {
2546      scan.setLoadColumnFamiliesOnDemand(region.isLoadingCfsOnDemandDefault());
2547    }
2548    RegionScannerImpl scanner = null;
2549    try {
2550      scanner = region.getScanner(scan);
2551      scanner.next(results);
2552    } finally {
2553      if (scanner != null) {
2554        if (closeCallBack == null) {
2555          // If there is a context then the scanner can be added to the current
2556          // RpcCallContext. The rpc callback will take care of closing the
2557          // scanner, for eg in case
2558          // of get()
2559          context.setCallBack(scanner);
2560        } else {
2561          // The call is from multi() where the results from the get() are
2562          // aggregated and then send out to the
2563          // rpc. The rpccall back will close all such scanners created as part
2564          // of multi().
2565          closeCallBack.addScanner(scanner);
2566        }
2567      }
2568    }
2569
2570    // post-get CP hook
2571    if (region.getCoprocessorHost() != null) {
2572      region.getCoprocessorHost().postGet(get, results);
2573    }
2574    region.metricsUpdateForGet();
2575
2576    return Result.create(results, get.isCheckExistenceOnly() ? !results.isEmpty() : null, stale);
2577  }
2578
2579  private void checkBatchSizeAndLogLargeSize(MultiRequest request) throws ServiceException {
2580    int sum = 0;
2581    String firstRegionName = null;
2582    for (RegionAction regionAction : request.getRegionActionList()) {
2583      if (sum == 0) {
2584        firstRegionName = Bytes.toStringBinary(regionAction.getRegion().getValue().toByteArray());
2585      }
2586      sum += regionAction.getActionCount();
2587    }
2588    if (sum > rowSizeWarnThreshold) {
2589      LOG.warn("Large batch operation detected (greater than " + rowSizeWarnThreshold
2590        + ") (HBASE-18023)." + " Requested Number of Rows: " + sum + " Client: "
2591        + RpcServer.getRequestUserName().orElse(null) + "/"
2592        + RpcServer.getRemoteAddress().orElse(null) + " first region in multi=" + firstRegionName);
2593      if (rejectRowsWithSizeOverThreshold) {
2594        throw new ServiceException(
2595          "Rejecting large batch operation for current batch with firstRegionName: "
2596            + firstRegionName + " , Requested Number of Rows: " + sum + " , Size Threshold: "
2597            + rowSizeWarnThreshold);
2598      }
2599    }
2600  }
2601
2602  private void failRegionAction(MultiResponse.Builder responseBuilder,
2603    RegionActionResult.Builder regionActionResultBuilder, RegionAction regionAction,
2604    CellScanner cellScanner, Throwable error) {
2605    rpcServer.getMetrics().exception(error);
2606    regionActionResultBuilder.setException(ResponseConverter.buildException(error));
2607    responseBuilder.addRegionActionResult(regionActionResultBuilder.build());
2608    // All Mutations in this RegionAction not executed as we can not see the Region online here
2609    // in this RS. Will be retried from Client. Skipping all the Cells in CellScanner
2610    // corresponding to these Mutations.
2611    if (cellScanner != null) {
2612      skipCellsForMutations(regionAction.getActionList(), cellScanner);
2613    }
2614  }
2615
2616  private boolean isReplicationRequest(Action action) {
2617    // replication request can only be put or delete.
2618    if (!action.hasMutation()) {
2619      return false;
2620    }
2621    MutationProto mutation = action.getMutation();
2622    MutationType type = mutation.getMutateType();
2623    if (type != MutationType.PUT && type != MutationType.DELETE) {
2624      return false;
2625    }
2626    // replication will set a special attribute so we can make use of it to decide whether a request
2627    // is for replication.
2628    return mutation.getAttributeList().stream().map(p -> p.getName())
2629      .filter(n -> n.equals(ReplicationUtils.REPLICATION_ATTR_NAME)).findAny().isPresent();
2630  }
2631
2632  /**
2633   * Execute multiple actions on a table: get, mutate, and/or execCoprocessor
2634   * @param rpcc    the RPC controller
2635   * @param request the multi request
2636   */
2637  @Override
2638  public MultiResponse multi(final RpcController rpcc, final MultiRequest request)
2639    throws ServiceException {
2640    try {
2641      checkOpen();
2642    } catch (IOException ie) {
2643      throw new ServiceException(ie);
2644    }
2645
2646    checkBatchSizeAndLogLargeSize(request);
2647
2648    // rpc controller is how we bring in data via the back door; it is unprotobuf'ed data.
2649    // It is also the conduit via which we pass back data.
2650    HBaseRpcController controller = (HBaseRpcController) rpcc;
2651    CellScanner cellScanner = controller != null ? controller.cellScanner() : null;
2652    if (controller != null) {
2653      controller.setCellScanner(null);
2654    }
2655
2656    long nonceGroup = request.hasNonceGroup() ? request.getNonceGroup() : HConstants.NO_NONCE;
2657
2658    MultiResponse.Builder responseBuilder = MultiResponse.newBuilder();
2659    RegionActionResult.Builder regionActionResultBuilder = RegionActionResult.newBuilder();
2660    this.rpcMultiRequestCount.increment();
2661    this.requestCount.increment();
2662    ActivePolicyEnforcement spaceQuotaEnforcement = getSpaceQuotaManager().getActiveEnforcements();
2663
2664    // We no longer use MultiRequest#condition. Instead, we use RegionAction#condition. The
2665    // following logic is for backward compatibility as old clients still use
2666    // MultiRequest#condition in case of checkAndMutate with RowMutations.
2667    if (request.hasCondition()) {
2668      if (request.getRegionActionList().isEmpty()) {
2669        // If the region action list is empty, do nothing.
2670        responseBuilder.setProcessed(true);
2671        return responseBuilder.build();
2672      }
2673
2674      RegionAction regionAction = request.getRegionAction(0);
2675
2676      // When request.hasCondition() is true, regionAction.getAtomic() should be always true. So
2677      // we can assume regionAction.getAtomic() is true here.
2678      assert regionAction.getAtomic();
2679
2680      OperationQuota quota;
2681      HRegion region;
2682      RegionSpecifier regionSpecifier = regionAction.getRegion();
2683
2684      try {
2685        region = getRegion(regionSpecifier);
2686        quota = getRpcQuotaManager().checkBatchQuota(region, regionAction.getActionList(),
2687          regionAction.hasCondition());
2688      } catch (IOException e) {
2689        failRegionAction(responseBuilder, regionActionResultBuilder, regionAction, cellScanner, e);
2690        return responseBuilder.build();
2691      }
2692
2693      try {
2694        boolean rejectIfFromClient = shouldRejectRequestsFromClient(region);
2695        // We only allow replication in standby state and it will not set the atomic flag.
2696        if (rejectIfFromClient) {
2697          failRegionAction(responseBuilder, regionActionResultBuilder, regionAction, cellScanner,
2698            new DoNotRetryIOException(
2699              region.getRegionInfo().getRegionNameAsString() + " is in STANDBY state"));
2700          return responseBuilder.build();
2701        }
2702
2703        try {
2704          CheckAndMutateResult result = checkAndMutate(region, regionAction.getActionList(),
2705            cellScanner, request.getCondition(), nonceGroup, spaceQuotaEnforcement);
2706          responseBuilder.setProcessed(result.isSuccess());
2707          ClientProtos.ResultOrException.Builder resultOrExceptionOrBuilder =
2708            ClientProtos.ResultOrException.newBuilder();
2709          for (int i = 0; i < regionAction.getActionCount(); i++) {
2710            // To unify the response format with doNonAtomicRegionMutation and read through
2711            // client's AsyncProcess we have to add an empty result instance per operation
2712            resultOrExceptionOrBuilder.clear();
2713            resultOrExceptionOrBuilder.setIndex(i);
2714            regionActionResultBuilder.addResultOrException(resultOrExceptionOrBuilder.build());
2715          }
2716        } catch (IOException e) {
2717          rpcServer.getMetrics().exception(e);
2718          // As it's an atomic operation with a condition, we may expect it's a global failure.
2719          regionActionResultBuilder.setException(ResponseConverter.buildException(e));
2720        }
2721      } finally {
2722        quota.close();
2723      }
2724
2725      responseBuilder.addRegionActionResult(regionActionResultBuilder.build());
2726      ClientProtos.RegionLoadStats regionLoadStats = region.getLoadStatistics();
2727      if (regionLoadStats != null) {
2728        responseBuilder.setRegionStatistics(MultiRegionLoadStats.newBuilder()
2729          .addRegion(regionSpecifier).addStat(regionLoadStats).build());
2730      }
2731      return responseBuilder.build();
2732    }
2733
2734    // this will contain all the cells that we need to return. It's created later, if needed.
2735    List<CellScannable> cellsToReturn = null;
2736    RegionScannersCloseCallBack closeCallBack = null;
2737    RpcCallContext context = RpcServer.getCurrentCall().orElse(null);
2738    Map<RegionSpecifier, ClientProtos.RegionLoadStats> regionStats =
2739      new HashMap<>(request.getRegionActionCount());
2740
2741    for (RegionAction regionAction : request.getRegionActionList()) {
2742      OperationQuota quota;
2743      HRegion region;
2744      RegionSpecifier regionSpecifier = regionAction.getRegion();
2745      regionActionResultBuilder.clear();
2746
2747      try {
2748        region = getRegion(regionSpecifier);
2749        quota = getRpcQuotaManager().checkBatchQuota(region, regionAction.getActionList(),
2750          regionAction.hasCondition());
2751      } catch (IOException e) {
2752        failRegionAction(responseBuilder, regionActionResultBuilder, regionAction, cellScanner, e);
2753        continue; // For this region it's a failure.
2754      }
2755
2756      try {
2757        boolean rejectIfFromClient = shouldRejectRequestsFromClient(region);
2758
2759        if (regionAction.hasCondition()) {
2760          // We only allow replication in standby state and it will not set the atomic flag.
2761          if (rejectIfFromClient) {
2762            failRegionAction(responseBuilder, regionActionResultBuilder, regionAction, cellScanner,
2763              new DoNotRetryIOException(
2764                region.getRegionInfo().getRegionNameAsString() + " is in STANDBY state"));
2765            continue;
2766          }
2767
2768          try {
2769            ClientProtos.ResultOrException.Builder resultOrExceptionOrBuilder =
2770              ClientProtos.ResultOrException.newBuilder();
2771            if (regionAction.getActionCount() == 1) {
2772              CheckAndMutateResult result =
2773                checkAndMutate(region, quota, regionAction.getAction(0).getMutation(), cellScanner,
2774                  regionAction.getCondition(), nonceGroup, spaceQuotaEnforcement, context);
2775              regionActionResultBuilder.setProcessed(result.isSuccess());
2776              resultOrExceptionOrBuilder.setIndex(0);
2777              if (result.getResult() != null) {
2778                resultOrExceptionOrBuilder.setResult(ProtobufUtil.toResult(result.getResult()));
2779              }
2780              regionActionResultBuilder.addResultOrException(resultOrExceptionOrBuilder.build());
2781            } else {
2782              CheckAndMutateResult result = checkAndMutate(region, regionAction.getActionList(),
2783                cellScanner, regionAction.getCondition(), nonceGroup, spaceQuotaEnforcement);
2784              regionActionResultBuilder.setProcessed(result.isSuccess());
2785              for (int i = 0; i < regionAction.getActionCount(); i++) {
2786                if (i == 0 && result.getResult() != null) {
2787                  // Set the result of the Increment/Append operations to the first element of the
2788                  // ResultOrException list
2789                  resultOrExceptionOrBuilder.setIndex(i);
2790                  regionActionResultBuilder.addResultOrException(resultOrExceptionOrBuilder
2791                    .setResult(ProtobufUtil.toResult(result.getResult())).build());
2792                  continue;
2793                }
2794                // To unify the response format with doNonAtomicRegionMutation and read through
2795                // client's AsyncProcess we have to add an empty result instance per operation
2796                resultOrExceptionOrBuilder.clear();
2797                resultOrExceptionOrBuilder.setIndex(i);
2798                regionActionResultBuilder.addResultOrException(resultOrExceptionOrBuilder.build());
2799              }
2800            }
2801          } catch (IOException e) {
2802            rpcServer.getMetrics().exception(e);
2803            // As it's an atomic operation with a condition, we may expect it's a global failure.
2804            regionActionResultBuilder.setException(ResponseConverter.buildException(e));
2805          }
2806        } else if (regionAction.hasAtomic() && regionAction.getAtomic()) {
2807          // We only allow replication in standby state and it will not set the atomic flag.
2808          if (rejectIfFromClient) {
2809            failRegionAction(responseBuilder, regionActionResultBuilder, regionAction, cellScanner,
2810              new DoNotRetryIOException(
2811                region.getRegionInfo().getRegionNameAsString() + " is in STANDBY state"));
2812            continue;
2813          }
2814          try {
2815            doAtomicBatchOp(regionActionResultBuilder, region, quota, regionAction.getActionList(),
2816              cellScanner, nonceGroup, spaceQuotaEnforcement);
2817            regionActionResultBuilder.setProcessed(true);
2818            // We no longer use MultiResponse#processed. Instead, we use
2819            // RegionActionResult#processed. This is for backward compatibility for old clients.
2820            responseBuilder.setProcessed(true);
2821          } catch (IOException e) {
2822            rpcServer.getMetrics().exception(e);
2823            // As it's atomic, we may expect it's a global failure.
2824            regionActionResultBuilder.setException(ResponseConverter.buildException(e));
2825          }
2826        } else {
2827          if (
2828            rejectIfFromClient && regionAction.getActionCount() > 0
2829              && !isReplicationRequest(regionAction.getAction(0))
2830          ) {
2831            // fail if it is not a replication request
2832            failRegionAction(responseBuilder, regionActionResultBuilder, regionAction, cellScanner,
2833              new DoNotRetryIOException(
2834                region.getRegionInfo().getRegionNameAsString() + " is in STANDBY state"));
2835            continue;
2836          }
2837          // doNonAtomicRegionMutation manages the exception internally
2838          if (context != null && closeCallBack == null) {
2839            // An RpcCallBack that creates a list of scanners that needs to perform callBack
2840            // operation on completion of multiGets.
2841            // Set this only once
2842            closeCallBack = new RegionScannersCloseCallBack();
2843            context.setCallBack(closeCallBack);
2844          }
2845          cellsToReturn = doNonAtomicRegionMutation(region, quota, regionAction, cellScanner,
2846            regionActionResultBuilder, cellsToReturn, nonceGroup, closeCallBack, context,
2847            spaceQuotaEnforcement);
2848        }
2849      } finally {
2850        quota.close();
2851      }
2852
2853      responseBuilder.addRegionActionResult(regionActionResultBuilder.build());
2854      ClientProtos.RegionLoadStats regionLoadStats = region.getLoadStatistics();
2855      if (regionLoadStats != null) {
2856        regionStats.put(regionSpecifier, regionLoadStats);
2857      }
2858    }
2859    // Load the controller with the Cells to return.
2860    if (cellsToReturn != null && !cellsToReturn.isEmpty() && controller != null) {
2861      controller.setCellScanner(CellUtil.createCellScanner(cellsToReturn));
2862    }
2863
2864    MultiRegionLoadStats.Builder builder = MultiRegionLoadStats.newBuilder();
2865    for (Entry<RegionSpecifier, ClientProtos.RegionLoadStats> stat : regionStats.entrySet()) {
2866      builder.addRegion(stat.getKey());
2867      builder.addStat(stat.getValue());
2868    }
2869    responseBuilder.setRegionStatistics(builder);
2870    return responseBuilder.build();
2871  }
2872
2873  private void skipCellsForMutations(List<Action> actions, CellScanner cellScanner) {
2874    if (cellScanner == null) {
2875      return;
2876    }
2877    for (Action action : actions) {
2878      skipCellsForMutation(action, cellScanner);
2879    }
2880  }
2881
2882  private void skipCellsForMutation(Action action, CellScanner cellScanner) {
2883    if (cellScanner == null) {
2884      return;
2885    }
2886    try {
2887      if (action.hasMutation()) {
2888        MutationProto m = action.getMutation();
2889        if (m.hasAssociatedCellCount()) {
2890          for (int i = 0; i < m.getAssociatedCellCount(); i++) {
2891            cellScanner.advance();
2892          }
2893        }
2894      }
2895    } catch (IOException e) {
2896      // No need to handle these Individual Muatation level issue. Any way this entire RegionAction
2897      // marked as failed as we could not see the Region here. At client side the top level
2898      // RegionAction exception will be considered first.
2899      LOG.error("Error while skipping Cells in CellScanner for invalid Region Mutations", e);
2900    }
2901  }
2902
2903  /**
2904   * Mutate data in a table.
2905   * @param rpcc    the RPC controller
2906   * @param request the mutate request
2907   */
2908  @Override
2909  public MutateResponse mutate(final RpcController rpcc, final MutateRequest request)
2910    throws ServiceException {
2911    // rpc controller is how we bring in data via the back door; it is unprotobuf'ed data.
2912    // It is also the conduit via which we pass back data.
2913    HBaseRpcController controller = (HBaseRpcController) rpcc;
2914    CellScanner cellScanner = controller != null ? controller.cellScanner() : null;
2915    OperationQuota quota = null;
2916    RpcCallContext context = RpcServer.getCurrentCall().orElse(null);
2917    // Clear scanner so we are not holding on to reference across call.
2918    if (controller != null) {
2919      controller.setCellScanner(null);
2920    }
2921    try {
2922      checkOpen();
2923      requestCount.increment();
2924      rpcMutateRequestCount.increment();
2925      HRegion region = getRegion(request.getRegion());
2926      rejectIfInStandByState(region);
2927      MutateResponse.Builder builder = MutateResponse.newBuilder();
2928      MutationProto mutation = request.getMutation();
2929      if (!region.getRegionInfo().isMetaRegion()) {
2930        server.getMemStoreFlusher().reclaimMemStoreMemory();
2931      }
2932      long nonceGroup = request.hasNonceGroup() ? request.getNonceGroup() : HConstants.NO_NONCE;
2933      OperationQuota.OperationType operationType = QuotaUtil.getQuotaOperationType(request);
2934      quota = getRpcQuotaManager().checkBatchQuota(region, operationType);
2935      ActivePolicyEnforcement spaceQuotaEnforcement =
2936        getSpaceQuotaManager().getActiveEnforcements();
2937
2938      if (request.hasCondition()) {
2939        CheckAndMutateResult result = checkAndMutate(region, quota, mutation, cellScanner,
2940          request.getCondition(), nonceGroup, spaceQuotaEnforcement, context);
2941        builder.setProcessed(result.isSuccess());
2942        boolean clientCellBlockSupported = isClientCellBlockSupport(context);
2943        addResult(builder, result.getResult(), controller, clientCellBlockSupported);
2944        if (clientCellBlockSupported) {
2945          addSize(context, result.getResult());
2946        }
2947      } else {
2948        Result r = null;
2949        Boolean processed = null;
2950        MutationType type = mutation.getMutateType();
2951        switch (type) {
2952          case APPEND:
2953            // TODO: this doesn't actually check anything.
2954            r = append(region, quota, mutation, cellScanner, nonceGroup, spaceQuotaEnforcement,
2955              context);
2956            break;
2957          case INCREMENT:
2958            // TODO: this doesn't actually check anything.
2959            r = increment(region, quota, mutation, cellScanner, nonceGroup, spaceQuotaEnforcement,
2960              context);
2961            break;
2962          case PUT:
2963            put(region, quota, mutation, cellScanner, spaceQuotaEnforcement);
2964            processed = Boolean.TRUE;
2965            break;
2966          case DELETE:
2967            delete(region, quota, mutation, cellScanner, spaceQuotaEnforcement);
2968            processed = Boolean.TRUE;
2969            break;
2970          default:
2971            throw new DoNotRetryIOException("Unsupported mutate type: " + type.name());
2972        }
2973        if (processed != null) {
2974          builder.setProcessed(processed);
2975        }
2976        boolean clientCellBlockSupported = isClientCellBlockSupport(context);
2977        addResult(builder, r, controller, clientCellBlockSupported);
2978        if (clientCellBlockSupported) {
2979          addSize(context, r);
2980        }
2981      }
2982      return builder.build();
2983    } catch (IOException ie) {
2984      server.checkFileSystem();
2985      throw new ServiceException(ie);
2986    } finally {
2987      if (quota != null) {
2988        quota.close();
2989      }
2990    }
2991  }
2992
2993  private void put(HRegion region, OperationQuota quota, MutationProto mutation,
2994    CellScanner cellScanner, ActivePolicyEnforcement spaceQuota) throws IOException {
2995    long before = EnvironmentEdgeManager.currentTime();
2996    Put put = ProtobufUtil.toPut(mutation, cellScanner);
2997    checkCellSizeLimit(region, put);
2998    spaceQuota.getPolicyEnforcement(region).check(put);
2999    quota.addMutation(put);
3000    region.put(put);
3001
3002    MetricsRegionServer metricsRegionServer = server.getMetrics();
3003    if (metricsRegionServer != null) {
3004      long after = EnvironmentEdgeManager.currentTime();
3005      metricsRegionServer.updatePut(region, after - before);
3006    }
3007  }
3008
3009  private void delete(HRegion region, OperationQuota quota, MutationProto mutation,
3010    CellScanner cellScanner, ActivePolicyEnforcement spaceQuota) throws IOException {
3011    long before = EnvironmentEdgeManager.currentTime();
3012    Delete delete = ProtobufUtil.toDelete(mutation, cellScanner);
3013    checkCellSizeLimit(region, delete);
3014    spaceQuota.getPolicyEnforcement(region).check(delete);
3015    quota.addMutation(delete);
3016    region.delete(delete);
3017
3018    MetricsRegionServer metricsRegionServer = server.getMetrics();
3019    if (metricsRegionServer != null) {
3020      long after = EnvironmentEdgeManager.currentTime();
3021      metricsRegionServer.updateDelete(region, after - before);
3022    }
3023  }
3024
3025  private CheckAndMutateResult checkAndMutate(HRegion region, OperationQuota quota,
3026    MutationProto mutation, CellScanner cellScanner, Condition condition, long nonceGroup,
3027    ActivePolicyEnforcement spaceQuota, RpcCallContext context) throws IOException {
3028    long before = EnvironmentEdgeManager.currentTime();
3029    long blockBytesScannedBefore = context != null ? context.getBlockBytesScanned() : 0;
3030    CheckAndMutate checkAndMutate = ProtobufUtil.toCheckAndMutate(condition, mutation, cellScanner);
3031    long nonce = mutation.hasNonce() ? mutation.getNonce() : HConstants.NO_NONCE;
3032    checkCellSizeLimit(region, (Mutation) checkAndMutate.getAction());
3033    spaceQuota.getPolicyEnforcement(region).check((Mutation) checkAndMutate.getAction());
3034    quota.addMutation((Mutation) checkAndMutate.getAction());
3035
3036    CheckAndMutateResult result = null;
3037    if (region.getCoprocessorHost() != null) {
3038      result = region.getCoprocessorHost().preCheckAndMutate(checkAndMutate);
3039    }
3040    if (result == null) {
3041      result = region.checkAndMutate(checkAndMutate, nonceGroup, nonce);
3042      if (region.getCoprocessorHost() != null) {
3043        result = region.getCoprocessorHost().postCheckAndMutate(checkAndMutate, result);
3044      }
3045    }
3046    MetricsRegionServer metricsRegionServer = server.getMetrics();
3047    if (metricsRegionServer != null) {
3048      long after = EnvironmentEdgeManager.currentTime();
3049      long blockBytesScanned =
3050        context != null ? context.getBlockBytesScanned() - blockBytesScannedBefore : 0;
3051      metricsRegionServer.updateCheckAndMutate(region, after - before, blockBytesScanned);
3052
3053      MutationType type = mutation.getMutateType();
3054      switch (type) {
3055        case PUT:
3056          metricsRegionServer.updateCheckAndPut(region, after - before);
3057          break;
3058        case DELETE:
3059          metricsRegionServer.updateCheckAndDelete(region, after - before);
3060          break;
3061        default:
3062          break;
3063      }
3064    }
3065    return result;
3066  }
3067
3068  // This is used to keep compatible with the old client implementation. Consider remove it if we
3069  // decide to drop the support of the client that still sends close request to a region scanner
3070  // which has already been exhausted.
3071  @Deprecated
3072  private static final IOException SCANNER_ALREADY_CLOSED = new IOException() {
3073
3074    private static final long serialVersionUID = -4305297078988180130L;
3075
3076    @Override
3077    public synchronized Throwable fillInStackTrace() {
3078      return this;
3079    }
3080  };
3081
3082  private RegionScannerHolder getRegionScanner(ScanRequest request) throws IOException {
3083    String scannerName = toScannerName(request.getScannerId());
3084    RegionScannerHolder rsh = this.scanners.get(scannerName);
3085    if (rsh == null) {
3086      // just ignore the next or close request if scanner does not exists.
3087      Long lastCallSeq = closedScanners.getIfPresent(scannerName);
3088      if (lastCallSeq != null) {
3089        // Check the sequence number to catch if the last call was incorrectly retried.
3090        // The only allowed scenario is when the scanner is exhausted and one more scan
3091        // request arrives - in this case returning 0 rows is correct.
3092        if (request.hasNextCallSeq() && request.getNextCallSeq() != lastCallSeq + 1) {
3093          throw new OutOfOrderScannerNextException("Expected nextCallSeq for closed request: "
3094            + (lastCallSeq + 1) + " But the nextCallSeq got from client: "
3095            + request.getNextCallSeq() + "; request=" + TextFormat.shortDebugString(request));
3096        } else {
3097          throw SCANNER_ALREADY_CLOSED;
3098        }
3099      } else {
3100        LOG.warn("Client tried to access missing scanner " + scannerName);
3101        throw new UnknownScannerException(
3102          "Unknown scanner '" + scannerName + "'. This can happen due to any of the following "
3103            + "reasons: a) Scanner id given is wrong, b) Scanner lease expired because of "
3104            + "long wait between consecutive client checkins, c) Server may be closing down, "
3105            + "d) RegionServer restart during upgrade.\nIf the issue is due to reason (b), a "
3106            + "possible fix would be increasing the value of"
3107            + "'hbase.client.scanner.timeout.period' configuration.");
3108      }
3109    }
3110    rejectIfInStandByState(rsh.r);
3111    RegionInfo hri = rsh.s.getRegionInfo();
3112    // Yes, should be the same instance
3113    if (server.getOnlineRegion(hri.getRegionName()) != rsh.r) {
3114      String msg = "Region has changed on the scanner " + scannerName + ": regionName="
3115        + hri.getRegionNameAsString() + ", scannerRegionName=" + rsh.r;
3116      LOG.warn(msg + ", closing...");
3117      scanners.remove(scannerName);
3118      try {
3119        rsh.s.close();
3120      } catch (IOException e) {
3121        LOG.warn("Getting exception closing " + scannerName, e);
3122      } finally {
3123        try {
3124          server.getLeaseManager().cancelLease(scannerName);
3125        } catch (LeaseException e) {
3126          LOG.warn("Getting exception closing " + scannerName, e);
3127        }
3128      }
3129      throw new NotServingRegionException(msg);
3130    }
3131    return rsh;
3132  }
3133
3134  /**
3135   * @return Pair with scannerName key to use with this new Scanner and its RegionScannerHolder
3136   *         value.
3137   */
3138  private Pair<String, RegionScannerHolder> newRegionScanner(ScanRequest request,
3139    ScanResponse.Builder builder) throws IOException {
3140    HRegion region = getRegion(request.getRegion());
3141    rejectIfInStandByState(region);
3142    ClientProtos.Scan protoScan = request.getScan();
3143    boolean isLoadingCfsOnDemandSet = protoScan.hasLoadColumnFamiliesOnDemand();
3144    Scan scan = ProtobufUtil.toScan(protoScan);
3145    // if the request doesn't set this, get the default region setting.
3146    if (!isLoadingCfsOnDemandSet) {
3147      scan.setLoadColumnFamiliesOnDemand(region.isLoadingCfsOnDemandDefault());
3148    }
3149
3150    if (!scan.hasFamilies()) {
3151      // Adding all families to scanner
3152      for (byte[] family : region.getTableDescriptor().getColumnFamilyNames()) {
3153        scan.addFamily(family);
3154      }
3155    }
3156    if (region.getCoprocessorHost() != null) {
3157      // preScannerOpen is not allowed to return a RegionScanner. Only post hook can create a
3158      // wrapper for the core created RegionScanner
3159      region.getCoprocessorHost().preScannerOpen(scan);
3160    }
3161    RegionScannerImpl coreScanner = region.getScanner(scan);
3162    Shipper shipper = coreScanner;
3163    RegionScanner scanner = coreScanner;
3164    try {
3165      if (region.getCoprocessorHost() != null) {
3166        scanner = region.getCoprocessorHost().postScannerOpen(scan, scanner);
3167      }
3168    } catch (Exception e) {
3169      // Although region coprocessor is for advanced users and they should take care of the
3170      // implementation to not damage the HBase system, closing the scanner on exception here does
3171      // not have any bad side effect, so let's do it
3172      scanner.close();
3173      throw e;
3174    }
3175    long scannerId = scannerIdGenerator.generateNewScannerId();
3176    builder.setScannerId(scannerId);
3177    builder.setMvccReadPoint(scanner.getMvccReadPoint());
3178    builder.setTtl(scannerLeaseTimeoutPeriod);
3179    String scannerName = toScannerName(scannerId);
3180
3181    boolean fullRegionScan =
3182      !region.getRegionInfo().getTable().isSystemTable() && isFullRegionScan(scan, region);
3183
3184    return new Pair<String, RegionScannerHolder>(scannerName,
3185      addScanner(scannerName, scanner, shipper, region, scan.isNeedCursorResult(), fullRegionScan));
3186  }
3187
3188  /**
3189   * The returned String is used as key doing look up of outstanding Scanners in this Servers'
3190   * this.scanners, the Map of outstanding scanners and their current state.
3191   * @param scannerId A scanner long id.
3192   * @return The long id as a String.
3193   */
3194  private static String toScannerName(long scannerId) {
3195    return Long.toString(scannerId);
3196  }
3197
3198  private void checkScanNextCallSeq(ScanRequest request, RegionScannerHolder rsh)
3199    throws OutOfOrderScannerNextException {
3200    // if nextCallSeq does not match throw Exception straight away. This needs to be
3201    // performed even before checking of Lease.
3202    // See HBASE-5974
3203    if (request.hasNextCallSeq()) {
3204      long callSeq = request.getNextCallSeq();
3205      if (!rsh.incNextCallSeq(callSeq)) {
3206        throw new OutOfOrderScannerNextException(
3207          "Expected nextCallSeq: " + rsh.getNextCallSeq() + " But the nextCallSeq got from client: "
3208            + request.getNextCallSeq() + "; request=" + TextFormat.shortDebugString(request));
3209      }
3210    }
3211  }
3212
3213  private void addScannerLeaseBack(LeaseManager.Lease lease) {
3214    try {
3215      server.getLeaseManager().addLease(lease);
3216    } catch (LeaseStillHeldException e) {
3217      // should not happen as the scanner id is unique.
3218      throw new AssertionError(e);
3219    }
3220  }
3221
3222  // visible for testing only
3223  long getTimeLimit(RpcCall rpcCall, HBaseRpcController controller,
3224    boolean allowHeartbeatMessages) {
3225    // Set the time limit to be half of the more restrictive timeout value (one of the
3226    // timeout values must be positive). In the event that both values are positive, the
3227    // more restrictive of the two is used to calculate the limit.
3228    if (allowHeartbeatMessages) {
3229      long now = EnvironmentEdgeManager.currentTime();
3230      long remainingTimeout = getRemainingRpcTimeout(rpcCall, controller, now);
3231      if (scannerLeaseTimeoutPeriod > 0 || remainingTimeout > 0) {
3232        long timeLimitDelta;
3233        if (scannerLeaseTimeoutPeriod > 0 && remainingTimeout > 0) {
3234          timeLimitDelta = Math.min(scannerLeaseTimeoutPeriod, remainingTimeout);
3235        } else {
3236          timeLimitDelta =
3237            scannerLeaseTimeoutPeriod > 0 ? scannerLeaseTimeoutPeriod : remainingTimeout;
3238        }
3239
3240        // Use half of whichever timeout value was more restrictive... But don't allow
3241        // the time limit to be less than the allowable minimum (could cause an
3242        // immediate timeout before scanning any data).
3243        timeLimitDelta = Math.max(timeLimitDelta / 2, minimumScanTimeLimitDelta);
3244        return now + timeLimitDelta;
3245      }
3246    }
3247    // Default value of timeLimit is negative to indicate no timeLimit should be
3248    // enforced.
3249    return -1L;
3250  }
3251
3252  private long getRemainingRpcTimeout(RpcCall call, HBaseRpcController controller, long now) {
3253    long timeout;
3254    if (controller != null && controller.getCallTimeout() > 0) {
3255      timeout = controller.getCallTimeout();
3256    } else if (rpcTimeout > 0) {
3257      timeout = rpcTimeout;
3258    } else {
3259      return -1;
3260    }
3261    if (call != null) {
3262      timeout -= (now - call.getReceiveTime());
3263    }
3264    // getTimeLimit ignores values <= 0, but timeout may now be negative if queue time was high.
3265    // return minimum value here in that case so we count this in calculating the final delta.
3266    return Math.max(minimumScanTimeLimitDelta, timeout);
3267  }
3268
3269  private void checkLimitOfRows(int numOfCompleteRows, int limitOfRows, boolean moreRows,
3270    ScannerContext scannerContext, ScanResponse.Builder builder) {
3271    if (numOfCompleteRows >= limitOfRows) {
3272      if (LOG.isTraceEnabled()) {
3273        LOG.trace("Done scanning, limit of rows reached, moreRows: " + moreRows
3274          + " scannerContext: " + scannerContext);
3275      }
3276      builder.setMoreResults(false);
3277    }
3278  }
3279
3280  // return whether we have more results in region.
3281  private void scan(HBaseRpcController controller, ScanRequest request, RegionScannerHolder rsh,
3282    long maxQuotaResultSize, int maxResults, int limitOfRows, List<Result> results,
3283    ScanResponse.Builder builder, RpcCall rpcCall) throws IOException {
3284    HRegion region = rsh.r;
3285    RegionScanner scanner = rsh.s;
3286    long maxResultSize;
3287    if (scanner.getMaxResultSize() > 0) {
3288      maxResultSize = Math.min(scanner.getMaxResultSize(), maxQuotaResultSize);
3289    } else {
3290      maxResultSize = maxQuotaResultSize;
3291    }
3292    // This is cells inside a row. Default size is 10 so if many versions or many cfs,
3293    // then we'll resize. Resizings show in profiler. Set it higher than 10. For now
3294    // arbitrary 32. TODO: keep record of general size of results being returned.
3295    ArrayList<Cell> values = new ArrayList<>(32);
3296    region.startRegionOperation(Operation.SCAN);
3297    long before = EnvironmentEdgeManager.currentTime();
3298    // Used to check if we've matched the row limit set on the Scan
3299    int numOfCompleteRows = 0;
3300    // Count of times we call nextRaw; can be > numOfCompleteRows.
3301    int numOfNextRawCalls = 0;
3302    try {
3303      int numOfResults = 0;
3304      synchronized (scanner) {
3305        boolean stale = (region.getRegionInfo().getReplicaId() != 0);
3306        boolean clientHandlesPartials =
3307          request.hasClientHandlesPartials() && request.getClientHandlesPartials();
3308        boolean clientHandlesHeartbeats =
3309          request.hasClientHandlesHeartbeats() && request.getClientHandlesHeartbeats();
3310
3311        // On the server side we must ensure that the correct ordering of partial results is
3312        // returned to the client to allow them to properly reconstruct the partial results.
3313        // If the coprocessor host is adding to the result list, we cannot guarantee the
3314        // correct ordering of partial results and so we prevent partial results from being
3315        // formed.
3316        boolean serverGuaranteesOrderOfPartials = results.isEmpty();
3317        boolean allowPartialResults = clientHandlesPartials && serverGuaranteesOrderOfPartials;
3318        boolean moreRows = false;
3319
3320        // Heartbeat messages occur when the processing of the ScanRequest is exceeds a
3321        // certain time threshold on the server. When the time threshold is exceeded, the
3322        // server stops the scan and sends back whatever Results it has accumulated within
3323        // that time period (may be empty). Since heartbeat messages have the potential to
3324        // create partial Results (in the event that the timeout occurs in the middle of a
3325        // row), we must only generate heartbeat messages when the client can handle both
3326        // heartbeats AND partials
3327        boolean allowHeartbeatMessages = clientHandlesHeartbeats && allowPartialResults;
3328
3329        long timeLimit = getTimeLimit(rpcCall, controller, allowHeartbeatMessages);
3330
3331        final LimitScope sizeScope =
3332          allowPartialResults ? LimitScope.BETWEEN_CELLS : LimitScope.BETWEEN_ROWS;
3333        final LimitScope timeScope =
3334          allowHeartbeatMessages ? LimitScope.BETWEEN_CELLS : LimitScope.BETWEEN_ROWS;
3335
3336        boolean trackMetrics = request.hasTrackScanMetrics() && request.getTrackScanMetrics();
3337
3338        // Configure with limits for this RPC. Set keep progress true since size progress
3339        // towards size limit should be kept between calls to nextRaw
3340        ScannerContext.Builder contextBuilder = ScannerContext.newBuilder(true);
3341        // maxResultSize - either we can reach this much size for all cells(being read) data or sum
3342        // of heap size occupied by cells(being read). Cell data means its key and value parts.
3343        // maxQuotaResultSize - max results just from server side configuration and quotas, without
3344        // user's specified max. We use this for evaluating limits based on blocks (not cells).
3345        // We may have accumulated some results in coprocessor preScannerNext call. Subtract any
3346        // cell or block size from maximum here so we adhere to total limits of request.
3347        // Note: we track block size in StoreScanner. If the CP hook got cells from hbase, it will
3348        // have accumulated block bytes. If not, this will be 0 for block size.
3349        long maxCellSize = maxResultSize;
3350        long maxBlockSize = maxQuotaResultSize;
3351        if (rpcCall != null) {
3352          maxBlockSize -= rpcCall.getBlockBytesScanned();
3353          maxCellSize -= rpcCall.getResponseCellSize();
3354        }
3355
3356        contextBuilder.setSizeLimit(sizeScope, maxCellSize, maxCellSize, maxBlockSize);
3357        contextBuilder.setBatchLimit(scanner.getBatch());
3358        contextBuilder.setTimeLimit(timeScope, timeLimit);
3359        contextBuilder.setTrackMetrics(trackMetrics);
3360        ScannerContext scannerContext = contextBuilder.build();
3361        boolean limitReached = false;
3362        while (numOfResults < maxResults) {
3363          // Reset the batch progress to 0 before every call to RegionScanner#nextRaw. The
3364          // batch limit is a limit on the number of cells per Result. Thus, if progress is
3365          // being tracked (i.e. scannerContext.keepProgress() is true) then we need to
3366          // reset the batch progress between nextRaw invocations since we don't want the
3367          // batch progress from previous calls to affect future calls
3368          scannerContext.setBatchProgress(0);
3369          assert values.isEmpty();
3370
3371          // Collect values to be returned here
3372          moreRows = scanner.nextRaw(values, scannerContext);
3373          if (rpcCall == null) {
3374            // When there is no RpcCallContext,copy EC to heap, then the scanner would close,
3375            // This can be an EXPENSIVE call. It may make an extra copy from offheap to onheap
3376            // buffers.See more details in HBASE-26036.
3377            CellUtil.cloneIfNecessary(values);
3378          }
3379          numOfNextRawCalls++;
3380
3381          if (!values.isEmpty()) {
3382            if (limitOfRows > 0) {
3383              // First we need to check if the last result is partial and we have a row change. If
3384              // so then we need to increase the numOfCompleteRows.
3385              if (results.isEmpty()) {
3386                if (
3387                  rsh.rowOfLastPartialResult != null
3388                    && !CellUtil.matchingRows(values.get(0), rsh.rowOfLastPartialResult)
3389                ) {
3390                  numOfCompleteRows++;
3391                  checkLimitOfRows(numOfCompleteRows, limitOfRows, moreRows, scannerContext,
3392                    builder);
3393                }
3394              } else {
3395                Result lastResult = results.get(results.size() - 1);
3396                if (
3397                  lastResult.mayHaveMoreCellsInRow()
3398                    && !CellUtil.matchingRows(values.get(0), lastResult.getRow())
3399                ) {
3400                  numOfCompleteRows++;
3401                  checkLimitOfRows(numOfCompleteRows, limitOfRows, moreRows, scannerContext,
3402                    builder);
3403                }
3404              }
3405              if (builder.hasMoreResults() && !builder.getMoreResults()) {
3406                break;
3407              }
3408            }
3409            boolean mayHaveMoreCellsInRow = scannerContext.mayHaveMoreCellsInRow();
3410            Result r = Result.create(values, null, stale, mayHaveMoreCellsInRow);
3411            results.add(r);
3412            numOfResults++;
3413            if (!mayHaveMoreCellsInRow && limitOfRows > 0) {
3414              numOfCompleteRows++;
3415              checkLimitOfRows(numOfCompleteRows, limitOfRows, moreRows, scannerContext, builder);
3416              if (builder.hasMoreResults() && !builder.getMoreResults()) {
3417                break;
3418              }
3419            }
3420          } else if (!moreRows && !results.isEmpty()) {
3421            // No more cells for the scan here, we need to ensure that the mayHaveMoreCellsInRow of
3422            // last result is false. Otherwise it's possible that: the first nextRaw returned
3423            // because BATCH_LIMIT_REACHED (BTW it happen to exhaust all cells of the scan),so the
3424            // last result's mayHaveMoreCellsInRow will be true. while the following nextRaw will
3425            // return with moreRows=false, which means moreResultsInRegion would be false, it will
3426            // be a contradictory state (HBASE-21206).
3427            int lastIdx = results.size() - 1;
3428            Result r = results.get(lastIdx);
3429            if (r.mayHaveMoreCellsInRow()) {
3430              results.set(lastIdx, Result.create(r.rawCells(), r.getExists(), r.isStale(), false));
3431            }
3432          }
3433          boolean sizeLimitReached = scannerContext.checkSizeLimit(LimitScope.BETWEEN_ROWS);
3434          boolean timeLimitReached = scannerContext.checkTimeLimit(LimitScope.BETWEEN_ROWS);
3435          boolean resultsLimitReached = numOfResults >= maxResults;
3436          limitReached = sizeLimitReached || timeLimitReached || resultsLimitReached;
3437
3438          if (limitReached || !moreRows) {
3439            // With block size limit, we may exceed size limit without collecting any results.
3440            // In this case we want to send heartbeat and/or cursor. We don't want to send heartbeat
3441            // or cursor if results were collected, for example for cell size or heap size limits.
3442            boolean sizeLimitReachedWithoutResults = sizeLimitReached && results.isEmpty();
3443            // We only want to mark a ScanResponse as a heartbeat message in the event that
3444            // there are more values to be read server side. If there aren't more values,
3445            // marking it as a heartbeat is wasteful because the client will need to issue
3446            // another ScanRequest only to realize that they already have all the values
3447            if (moreRows && (timeLimitReached || sizeLimitReachedWithoutResults)) {
3448              // Heartbeat messages occur when the time limit has been reached, or size limit has
3449              // been reached before collecting any results. This can happen for heavily filtered
3450              // scans which scan over too many blocks.
3451              builder.setHeartbeatMessage(true);
3452              if (rsh.needCursor) {
3453                Cell cursorCell = scannerContext.getLastPeekedCell();
3454                if (cursorCell != null) {
3455                  builder.setCursor(ProtobufUtil.toCursor(cursorCell));
3456                }
3457              }
3458            }
3459            break;
3460          }
3461          values.clear();
3462        }
3463        if (rpcCall != null) {
3464          rpcCall.incrementResponseCellSize(scannerContext.getHeapSizeProgress());
3465        }
3466        builder.setMoreResultsInRegion(moreRows);
3467        // Check to see if the client requested that we track metrics server side. If the
3468        // client requested metrics, retrieve the metrics from the scanner context.
3469        if (trackMetrics) {
3470          // rather than increment yet another counter in StoreScanner, just set the value here
3471          // from block size progress before writing into the response
3472          scannerContext.getMetrics().countOfBlockBytesScanned
3473            .set(scannerContext.getBlockSizeProgress());
3474          if (rpcCall != null) {
3475            scannerContext.getMetrics().fsReadTime.set(rpcCall.getFsReadTime());
3476          }
3477          Map<String, Long> metrics = scannerContext.getMetrics().getMetricsMap();
3478          ScanMetrics.Builder metricBuilder = ScanMetrics.newBuilder();
3479          NameInt64Pair.Builder pairBuilder = NameInt64Pair.newBuilder();
3480
3481          for (Entry<String, Long> entry : metrics.entrySet()) {
3482            pairBuilder.setName(entry.getKey());
3483            pairBuilder.setValue(entry.getValue());
3484            metricBuilder.addMetrics(pairBuilder.build());
3485          }
3486
3487          builder.setScanMetrics(metricBuilder.build());
3488        }
3489      }
3490    } finally {
3491      region.closeRegionOperation();
3492      // Update serverside metrics, even on error.
3493      long end = EnvironmentEdgeManager.currentTime();
3494
3495      long responseCellSize = 0;
3496      long blockBytesScanned = 0;
3497      if (rpcCall != null) {
3498        responseCellSize = rpcCall.getResponseCellSize();
3499        blockBytesScanned = rpcCall.getBlockBytesScanned();
3500        rsh.updateBlockBytesScanned(blockBytesScanned);
3501      }
3502      region.getMetrics().updateScan();
3503      final MetricsRegionServer metricsRegionServer = server.getMetrics();
3504      if (metricsRegionServer != null) {
3505        metricsRegionServer.updateScan(region, end - before, responseCellSize, blockBytesScanned);
3506        metricsRegionServer.updateReadQueryMeter(region, numOfNextRawCalls);
3507      }
3508    }
3509    // coprocessor postNext hook
3510    if (region.getCoprocessorHost() != null) {
3511      region.getCoprocessorHost().postScannerNext(scanner, results, maxResults, true);
3512    }
3513  }
3514
3515  /**
3516   * Scan data in a table.
3517   * @param controller the RPC controller
3518   * @param request    the scan request
3519   */
3520  @Override
3521  public ScanResponse scan(final RpcController controller, final ScanRequest request)
3522    throws ServiceException {
3523    if (controller != null && !(controller instanceof HBaseRpcController)) {
3524      throw new UnsupportedOperationException(
3525        "We only do " + "HBaseRpcControllers! FIX IF A PROBLEM: " + controller);
3526    }
3527    if (!request.hasScannerId() && !request.hasScan()) {
3528      throw new ServiceException(
3529        new DoNotRetryIOException("Missing required input: scannerId or scan"));
3530    }
3531    try {
3532      checkOpen();
3533    } catch (IOException e) {
3534      if (request.hasScannerId()) {
3535        String scannerName = toScannerName(request.getScannerId());
3536        if (LOG.isDebugEnabled()) {
3537          LOG.debug(
3538            "Server shutting down and client tried to access missing scanner " + scannerName);
3539        }
3540        final LeaseManager leaseManager = server.getLeaseManager();
3541        if (leaseManager != null) {
3542          try {
3543            leaseManager.cancelLease(scannerName);
3544          } catch (LeaseException le) {
3545            // No problem, ignore
3546            if (LOG.isTraceEnabled()) {
3547              LOG.trace("Un-able to cancel lease of scanner. It could already be closed.");
3548            }
3549          }
3550        }
3551      }
3552      throw new ServiceException(e);
3553    }
3554    requestCount.increment();
3555    rpcScanRequestCount.increment();
3556    RegionScannerHolder rsh;
3557    ScanResponse.Builder builder = ScanResponse.newBuilder();
3558    String scannerName;
3559    try {
3560      if (request.hasScannerId()) {
3561        // The downstream projects such as AsyncHBase in OpenTSDB need this value. See HBASE-18000
3562        // for more details.
3563        long scannerId = request.getScannerId();
3564        builder.setScannerId(scannerId);
3565        scannerName = toScannerName(scannerId);
3566        rsh = getRegionScanner(request);
3567      } else {
3568        Pair<String, RegionScannerHolder> scannerNameAndRSH = newRegionScanner(request, builder);
3569        scannerName = scannerNameAndRSH.getFirst();
3570        rsh = scannerNameAndRSH.getSecond();
3571      }
3572    } catch (IOException e) {
3573      if (e == SCANNER_ALREADY_CLOSED) {
3574        // Now we will close scanner automatically if there are no more results for this region but
3575        // the old client will still send a close request to us. Just ignore it and return.
3576        return builder.build();
3577      }
3578      throw new ServiceException(e);
3579    }
3580    if (rsh.fullRegionScan) {
3581      rpcFullScanRequestCount.increment();
3582    }
3583    HRegion region = rsh.r;
3584    LeaseManager.Lease lease;
3585    try {
3586      // Remove lease while its being processed in server; protects against case
3587      // where processing of request takes > lease expiration time. or null if none found.
3588      lease = server.getLeaseManager().removeLease(scannerName);
3589    } catch (LeaseException e) {
3590      throw new ServiceException(e);
3591    }
3592    if (request.hasRenew() && request.getRenew()) {
3593      // add back and return
3594      addScannerLeaseBack(lease);
3595      try {
3596        checkScanNextCallSeq(request, rsh);
3597      } catch (OutOfOrderScannerNextException e) {
3598        throw new ServiceException(e);
3599      }
3600      return builder.build();
3601    }
3602    OperationQuota quota;
3603    try {
3604      quota = getRpcQuotaManager().checkScanQuota(region, request, maxScannerResultSize,
3605        rsh.getMaxBlockBytesScanned(), rsh.getPrevBlockBytesScannedDifference());
3606    } catch (IOException e) {
3607      addScannerLeaseBack(lease);
3608      throw new ServiceException(e);
3609    }
3610    try {
3611      checkScanNextCallSeq(request, rsh);
3612    } catch (OutOfOrderScannerNextException e) {
3613      addScannerLeaseBack(lease);
3614      throw new ServiceException(e);
3615    }
3616    // Now we have increased the next call sequence. If we give client an error, the retry will
3617    // never success. So we'd better close the scanner and return a DoNotRetryIOException to client
3618    // and then client will try to open a new scanner.
3619    boolean closeScanner = request.hasCloseScanner() ? request.getCloseScanner() : false;
3620    int rows; // this is scan.getCaching
3621    if (request.hasNumberOfRows()) {
3622      rows = request.getNumberOfRows();
3623    } else {
3624      rows = closeScanner ? 0 : 1;
3625    }
3626    RpcCall rpcCall = RpcServer.getCurrentCall().orElse(null);
3627    // now let's do the real scan.
3628    long maxQuotaResultSize = Math.min(maxScannerResultSize, quota.getMaxResultSize());
3629    RegionScanner scanner = rsh.s;
3630    // this is the limit of rows for this scan, if we the number of rows reach this value, we will
3631    // close the scanner.
3632    int limitOfRows;
3633    if (request.hasLimitOfRows()) {
3634      limitOfRows = request.getLimitOfRows();
3635    } else {
3636      limitOfRows = -1;
3637    }
3638    boolean scannerClosed = false;
3639    try {
3640      List<Result> results = new ArrayList<>(Math.min(rows, 512));
3641      if (rows > 0) {
3642        boolean done = false;
3643        // Call coprocessor. Get region info from scanner.
3644        if (region.getCoprocessorHost() != null) {
3645          Boolean bypass = region.getCoprocessorHost().preScannerNext(scanner, results, rows);
3646          if (!results.isEmpty()) {
3647            for (Result r : results) {
3648              // add cell size from CP results so we can track response size and update limits
3649              // when calling scan below if !done. We'll also have tracked block size if the CP
3650              // got results from hbase, since StoreScanner tracks that for all calls automatically.
3651              addSize(rpcCall, r);
3652            }
3653          }
3654          if (bypass != null && bypass.booleanValue()) {
3655            done = true;
3656          }
3657        }
3658        if (!done) {
3659          scan((HBaseRpcController) controller, request, rsh, maxQuotaResultSize, rows, limitOfRows,
3660            results, builder, rpcCall);
3661        } else {
3662          builder.setMoreResultsInRegion(!results.isEmpty());
3663        }
3664      } else {
3665        // This is a open scanner call with numberOfRow = 0, so set more results in region to true.
3666        builder.setMoreResultsInRegion(true);
3667      }
3668
3669      quota.addScanResult(results);
3670      addResults(builder, results, (HBaseRpcController) controller,
3671        RegionReplicaUtil.isDefaultReplica(region.getRegionInfo()),
3672        isClientCellBlockSupport(rpcCall));
3673      if (scanner.isFilterDone() && results.isEmpty()) {
3674        // If the scanner's filter - if any - is done with the scan
3675        // only set moreResults to false if the results is empty. This is used to keep compatible
3676        // with the old scan implementation where we just ignore the returned results if moreResults
3677        // is false. Can remove the isEmpty check after we get rid of the old implementation.
3678        builder.setMoreResults(false);
3679      }
3680      // Later we may close the scanner depending on this flag so here we need to make sure that we
3681      // have already set this flag.
3682      assert builder.hasMoreResultsInRegion();
3683      // we only set moreResults to false in the above code, so set it to true if we haven't set it
3684      // yet.
3685      if (!builder.hasMoreResults()) {
3686        builder.setMoreResults(true);
3687      }
3688      if (builder.getMoreResults() && builder.getMoreResultsInRegion() && !results.isEmpty()) {
3689        // Record the last cell of the last result if it is a partial result
3690        // We need this to calculate the complete rows we have returned to client as the
3691        // mayHaveMoreCellsInRow is true does not mean that there will be extra cells for the
3692        // current row. We may filter out all the remaining cells for the current row and just
3693        // return the cells of the nextRow when calling RegionScanner.nextRaw. So here we need to
3694        // check for row change.
3695        Result lastResult = results.get(results.size() - 1);
3696        if (lastResult.mayHaveMoreCellsInRow()) {
3697          rsh.rowOfLastPartialResult = lastResult.getRow();
3698        } else {
3699          rsh.rowOfLastPartialResult = null;
3700        }
3701      }
3702      if (!builder.getMoreResults() || !builder.getMoreResultsInRegion() || closeScanner) {
3703        scannerClosed = true;
3704        closeScanner(region, scanner, scannerName, rpcCall, false);
3705      }
3706
3707      // There's no point returning to a timed out client. Throwing ensures scanner is closed
3708      if (rpcCall != null && EnvironmentEdgeManager.currentTime() > rpcCall.getDeadline()) {
3709        throw new TimeoutIOException("Client deadline exceeded, cannot return results");
3710      }
3711
3712      return builder.build();
3713    } catch (IOException e) {
3714      try {
3715        // scanner is closed here
3716        scannerClosed = true;
3717        // The scanner state might be left in a dirty state, so we will tell the Client to
3718        // fail this RPC and close the scanner while opening up another one from the start of
3719        // row that the client has last seen.
3720        closeScanner(region, scanner, scannerName, rpcCall, true);
3721
3722        // If it is a DoNotRetryIOException already, throw as it is. Unfortunately, DNRIOE is
3723        // used in two different semantics.
3724        // (1) The first is to close the client scanner and bubble up the exception all the way
3725        // to the application. This is preferred when the exception is really un-recoverable
3726        // (like CorruptHFileException, etc). Plain DoNotRetryIOException also falls into this
3727        // bucket usually.
3728        // (2) Second semantics is to close the current region scanner only, but continue the
3729        // client scanner by overriding the exception. This is usually UnknownScannerException,
3730        // OutOfOrderScannerNextException, etc where the region scanner has to be closed, but the
3731        // application-level ClientScanner has to continue without bubbling up the exception to
3732        // the client. See ClientScanner code to see how it deals with these special exceptions.
3733        if (e instanceof DoNotRetryIOException) {
3734          throw e;
3735        }
3736
3737        // If it is a FileNotFoundException, wrap as a
3738        // DoNotRetryIOException. This can avoid the retry in ClientScanner.
3739        if (e instanceof FileNotFoundException) {
3740          throw new DoNotRetryIOException(e);
3741        }
3742
3743        // We closed the scanner already. Instead of throwing the IOException, and client
3744        // retrying with the same scannerId only to get USE on the next RPC, we directly throw
3745        // a special exception to save an RPC.
3746        if (VersionInfoUtil.hasMinimumVersion(rpcCall.getClientVersionInfo(), 1, 4)) {
3747          // 1.4.0+ clients know how to handle
3748          throw new ScannerResetException("Scanner is closed on the server-side", e);
3749        } else {
3750          // older clients do not know about SRE. Just throw USE, which they will handle
3751          throw new UnknownScannerException("Throwing UnknownScannerException to reset the client"
3752            + " scanner state for clients older than 1.3.", e);
3753        }
3754      } catch (IOException ioe) {
3755        throw new ServiceException(ioe);
3756      }
3757    } finally {
3758      if (!scannerClosed) {
3759        // Adding resets expiration time on lease.
3760        // the closeCallBack will be set in closeScanner so here we only care about shippedCallback
3761        if (rpcCall != null) {
3762          rpcCall.setCallBack(rsh.shippedCallback);
3763        } else {
3764          // If context is null,here we call rsh.shippedCallback directly to reuse the logic in
3765          // rsh.shippedCallback to release the internal resources in rsh,and lease is also added
3766          // back to regionserver's LeaseManager in rsh.shippedCallback.
3767          runShippedCallback(rsh);
3768        }
3769      }
3770      quota.close();
3771    }
3772  }
3773
3774  private void runShippedCallback(RegionScannerHolder rsh) throws ServiceException {
3775    assert rsh.shippedCallback != null;
3776    try {
3777      rsh.shippedCallback.run();
3778    } catch (IOException ioe) {
3779      throw new ServiceException(ioe);
3780    }
3781  }
3782
3783  private void closeScanner(HRegion region, RegionScanner scanner, String scannerName,
3784    RpcCallContext context, boolean isError) throws IOException {
3785    if (region.getCoprocessorHost() != null) {
3786      if (region.getCoprocessorHost().preScannerClose(scanner)) {
3787        // bypass the actual close.
3788        return;
3789      }
3790    }
3791    RegionScannerHolder rsh = scanners.remove(scannerName);
3792    if (rsh != null) {
3793      if (context != null) {
3794        context.setCallBack(rsh.closeCallBack);
3795      } else {
3796        rsh.s.close();
3797      }
3798      if (region.getCoprocessorHost() != null) {
3799        region.getCoprocessorHost().postScannerClose(scanner);
3800      }
3801      if (!isError) {
3802        closedScanners.put(scannerName, rsh.getNextCallSeq());
3803      }
3804    }
3805  }
3806
3807  @Override
3808  public CoprocessorServiceResponse execRegionServerService(RpcController controller,
3809    CoprocessorServiceRequest request) throws ServiceException {
3810    rpcPreCheck("execRegionServerService");
3811    return server.execRegionServerService(controller, request);
3812  }
3813
3814  @Override
3815  public GetSpaceQuotaSnapshotsResponse getSpaceQuotaSnapshots(RpcController controller,
3816    GetSpaceQuotaSnapshotsRequest request) throws ServiceException {
3817    try {
3818      final RegionServerSpaceQuotaManager manager = server.getRegionServerSpaceQuotaManager();
3819      final GetSpaceQuotaSnapshotsResponse.Builder builder =
3820        GetSpaceQuotaSnapshotsResponse.newBuilder();
3821      if (manager != null) {
3822        final Map<TableName, SpaceQuotaSnapshot> snapshots = manager.copyQuotaSnapshots();
3823        for (Entry<TableName, SpaceQuotaSnapshot> snapshot : snapshots.entrySet()) {
3824          builder.addSnapshots(TableQuotaSnapshot.newBuilder()
3825            .setTableName(ProtobufUtil.toProtoTableName(snapshot.getKey()))
3826            .setSnapshot(SpaceQuotaSnapshot.toProtoSnapshot(snapshot.getValue())).build());
3827        }
3828      }
3829      return builder.build();
3830    } catch (Exception e) {
3831      throw new ServiceException(e);
3832    }
3833  }
3834
3835  @Override
3836  public ClearRegionBlockCacheResponse clearRegionBlockCache(RpcController controller,
3837    ClearRegionBlockCacheRequest request) throws ServiceException {
3838    try {
3839      rpcPreCheck("clearRegionBlockCache");
3840      ClearRegionBlockCacheResponse.Builder builder = ClearRegionBlockCacheResponse.newBuilder();
3841      CacheEvictionStatsBuilder stats = CacheEvictionStats.builder();
3842      server.getRegionServerCoprocessorHost().preClearRegionBlockCache();
3843      List<HRegion> regions = getRegions(request.getRegionList(), stats);
3844      for (HRegion region : regions) {
3845        try {
3846          stats = stats.append(this.server.clearRegionBlockCache(region));
3847        } catch (Exception e) {
3848          stats.addException(region.getRegionInfo().getRegionName(), e);
3849        }
3850      }
3851      stats.withMaxCacheSize(server.getBlockCache().map(BlockCache::getMaxSize).orElse(0L));
3852      server.getRegionServerCoprocessorHost().postClearRegionBlockCache(stats.build());
3853      return builder.setStats(ProtobufUtil.toCacheEvictionStats(stats.build())).build();
3854    } catch (IOException e) {
3855      throw new ServiceException(e);
3856    }
3857  }
3858
3859  private void executeOpenRegionProcedures(OpenRegionRequest request,
3860    Map<TableName, TableDescriptor> tdCache) {
3861    long masterSystemTime = request.hasMasterSystemTime() ? request.getMasterSystemTime() : -1;
3862    for (RegionOpenInfo regionOpenInfo : request.getOpenInfoList()) {
3863      RegionInfo regionInfo = ProtobufUtil.toRegionInfo(regionOpenInfo.getRegion());
3864      TableName tableName = regionInfo.getTable();
3865      TableDescriptor tableDesc = tdCache.get(tableName);
3866      if (tableDesc == null) {
3867        try {
3868          tableDesc = server.getTableDescriptors().get(regionInfo.getTable());
3869        } catch (IOException e) {
3870          // Here we do not fail the whole method since we also need deal with other
3871          // procedures, and we can not ignore this one, so we still schedule a
3872          // AssignRegionHandler and it will report back to master if we still can not get the
3873          // TableDescriptor.
3874          LOG.warn("Failed to get TableDescriptor of {}, will try again in the handler",
3875            regionInfo.getTable(), e);
3876        }
3877        if (tableDesc != null) {
3878          tdCache.put(tableName, tableDesc);
3879        }
3880      }
3881      if (regionOpenInfo.getFavoredNodesCount() > 0) {
3882        server.updateRegionFavoredNodesMapping(regionInfo.getEncodedName(),
3883          regionOpenInfo.getFavoredNodesList());
3884      }
3885      long procId = regionOpenInfo.getOpenProcId();
3886      if (server.submitRegionProcedure(procId)) {
3887        server.getExecutorService().submit(
3888          AssignRegionHandler.create(server, regionInfo, procId, tableDesc, masterSystemTime));
3889      }
3890    }
3891  }
3892
3893  private void executeCloseRegionProcedures(CloseRegionRequest request) {
3894    String encodedName;
3895    try {
3896      encodedName = ProtobufUtil.getRegionEncodedName(request.getRegion());
3897    } catch (DoNotRetryIOException e) {
3898      throw new UncheckedIOException("Should not happen", e);
3899    }
3900    ServerName destination = request.hasDestinationServer()
3901      ? ProtobufUtil.toServerName(request.getDestinationServer())
3902      : null;
3903    long procId = request.getCloseProcId();
3904    boolean evictCache = request.getEvictCache();
3905    if (server.submitRegionProcedure(procId)) {
3906      server.getExecutorService().submit(
3907        UnassignRegionHandler.create(server, encodedName, procId, false, destination, evictCache));
3908    }
3909  }
3910
3911  private void executeProcedures(RemoteProcedureRequest request) {
3912    RSProcedureCallable callable;
3913    try {
3914      callable = Class.forName(request.getProcClass()).asSubclass(RSProcedureCallable.class)
3915        .getDeclaredConstructor().newInstance();
3916    } catch (Exception e) {
3917      LOG.warn("Failed to instantiating remote procedure {}, pid={}", request.getProcClass(),
3918        request.getProcId(), e);
3919      server.remoteProcedureComplete(request.getProcId(), e);
3920      return;
3921    }
3922    callable.init(request.getProcData().toByteArray(), server);
3923    LOG.debug("Executing remote procedure {}, pid={}", callable.getClass(), request.getProcId());
3924    server.executeProcedure(request.getProcId(), callable);
3925  }
3926
3927  @Override
3928  @QosPriority(priority = HConstants.ADMIN_QOS)
3929  public ExecuteProceduresResponse executeProcedures(RpcController controller,
3930    ExecuteProceduresRequest request) throws ServiceException {
3931    try {
3932      checkOpen();
3933      throwOnWrongStartCode(request);
3934      server.getRegionServerCoprocessorHost().preExecuteProcedures();
3935      if (request.getOpenRegionCount() > 0) {
3936        // Avoid reading from the TableDescritor every time(usually it will read from the file
3937        // system)
3938        Map<TableName, TableDescriptor> tdCache = new HashMap<>();
3939        request.getOpenRegionList().forEach(req -> executeOpenRegionProcedures(req, tdCache));
3940      }
3941      if (request.getCloseRegionCount() > 0) {
3942        request.getCloseRegionList().forEach(this::executeCloseRegionProcedures);
3943      }
3944      if (request.getProcCount() > 0) {
3945        request.getProcList().forEach(this::executeProcedures);
3946      }
3947      server.getRegionServerCoprocessorHost().postExecuteProcedures();
3948      return ExecuteProceduresResponse.getDefaultInstance();
3949    } catch (IOException e) {
3950      throw new ServiceException(e);
3951    }
3952  }
3953
3954  @Override
3955  public GetAllBootstrapNodesResponse getAllBootstrapNodes(RpcController controller,
3956    GetAllBootstrapNodesRequest request) throws ServiceException {
3957    GetAllBootstrapNodesResponse.Builder builder = GetAllBootstrapNodesResponse.newBuilder();
3958    server.getBootstrapNodes()
3959      .forEachRemaining(server -> builder.addNode(ProtobufUtil.toServerName(server)));
3960    return builder.build();
3961  }
3962
3963  private void setReloadableGuardrails(Configuration conf) {
3964    rowSizeWarnThreshold =
3965      conf.getInt(HConstants.BATCH_ROWS_THRESHOLD_NAME, HConstants.BATCH_ROWS_THRESHOLD_DEFAULT);
3966    rejectRowsWithSizeOverThreshold =
3967      conf.getBoolean(REJECT_BATCH_ROWS_OVER_THRESHOLD, DEFAULT_REJECT_BATCH_ROWS_OVER_THRESHOLD);
3968    maxScannerResultSize = conf.getLong(HConstants.HBASE_SERVER_SCANNER_MAX_RESULT_SIZE_KEY,
3969      HConstants.DEFAULT_HBASE_SERVER_SCANNER_MAX_RESULT_SIZE);
3970  }
3971
3972  @Override
3973  public void onConfigurationChange(Configuration conf) {
3974    super.onConfigurationChange(conf);
3975    setReloadableGuardrails(conf);
3976  }
3977
3978  @Override
3979  public GetCachedFilesListResponse getCachedFilesList(RpcController controller,
3980    GetCachedFilesListRequest request) throws ServiceException {
3981    GetCachedFilesListResponse.Builder responseBuilder = GetCachedFilesListResponse.newBuilder();
3982    List<String> fullyCachedFiles = new ArrayList<>();
3983    server.getBlockCache().flatMap(BlockCache::getFullyCachedFiles).ifPresent(fcf -> {
3984      fullyCachedFiles.addAll(fcf.keySet());
3985    });
3986    return responseBuilder.addAllCachedFiles(fullyCachedFiles).build();
3987  }
3988}