001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import com.google.errorprone.annotations.RestrictedApi;
021import java.io.FileNotFoundException;
022import java.io.IOException;
023import java.io.UncheckedIOException;
024import java.net.BindException;
025import java.net.InetAddress;
026import java.net.InetSocketAddress;
027import java.util.ArrayList;
028import java.util.Arrays;
029import java.util.Collections;
030import java.util.HashMap;
031import java.util.Iterator;
032import java.util.List;
033import java.util.Map;
034import java.util.Map.Entry;
035import java.util.NavigableMap;
036import java.util.Set;
037import java.util.TreeSet;
038import java.util.concurrent.ConcurrentHashMap;
039import java.util.concurrent.ConcurrentMap;
040import java.util.concurrent.TimeUnit;
041import java.util.concurrent.atomic.AtomicBoolean;
042import java.util.concurrent.atomic.AtomicLong;
043import java.util.concurrent.atomic.LongAdder;
044import org.apache.hadoop.conf.Configuration;
045import org.apache.hadoop.fs.FileSystem;
046import org.apache.hadoop.fs.Path;
047import org.apache.hadoop.hbase.CacheEvictionStats;
048import org.apache.hadoop.hbase.CacheEvictionStatsBuilder;
049import org.apache.hadoop.hbase.Cell;
050import org.apache.hadoop.hbase.CellScannable;
051import org.apache.hadoop.hbase.CellScanner;
052import org.apache.hadoop.hbase.CellUtil;
053import org.apache.hadoop.hbase.DoNotRetryIOException;
054import org.apache.hadoop.hbase.DroppedSnapshotException;
055import org.apache.hadoop.hbase.HBaseIOException;
056import org.apache.hadoop.hbase.HBaseRpcServicesBase;
057import org.apache.hadoop.hbase.HConstants;
058import org.apache.hadoop.hbase.MultiActionResultTooLarge;
059import org.apache.hadoop.hbase.NotServingRegionException;
060import org.apache.hadoop.hbase.PrivateCellUtil;
061import org.apache.hadoop.hbase.RegionTooBusyException;
062import org.apache.hadoop.hbase.Server;
063import org.apache.hadoop.hbase.ServerName;
064import org.apache.hadoop.hbase.TableName;
065import org.apache.hadoop.hbase.UnknownScannerException;
066import org.apache.hadoop.hbase.client.Append;
067import org.apache.hadoop.hbase.client.CheckAndMutate;
068import org.apache.hadoop.hbase.client.CheckAndMutateResult;
069import org.apache.hadoop.hbase.client.Delete;
070import org.apache.hadoop.hbase.client.Durability;
071import org.apache.hadoop.hbase.client.Get;
072import org.apache.hadoop.hbase.client.Increment;
073import org.apache.hadoop.hbase.client.Mutation;
074import org.apache.hadoop.hbase.client.OperationWithAttributes;
075import org.apache.hadoop.hbase.client.Put;
076import org.apache.hadoop.hbase.client.RegionInfo;
077import org.apache.hadoop.hbase.client.RegionReplicaUtil;
078import org.apache.hadoop.hbase.client.Result;
079import org.apache.hadoop.hbase.client.Row;
080import org.apache.hadoop.hbase.client.Scan;
081import org.apache.hadoop.hbase.client.TableDescriptor;
082import org.apache.hadoop.hbase.client.VersionInfoUtil;
083import org.apache.hadoop.hbase.exceptions.FailedSanityCheckException;
084import org.apache.hadoop.hbase.exceptions.OutOfOrderScannerNextException;
085import org.apache.hadoop.hbase.exceptions.ScannerResetException;
086import org.apache.hadoop.hbase.exceptions.TimeoutIOException;
087import org.apache.hadoop.hbase.exceptions.UnknownProtocolException;
088import org.apache.hadoop.hbase.io.ByteBuffAllocator;
089import org.apache.hadoop.hbase.io.hfile.BlockCache;
090import org.apache.hadoop.hbase.ipc.HBaseRpcController;
091import org.apache.hadoop.hbase.ipc.PriorityFunction;
092import org.apache.hadoop.hbase.ipc.QosPriority;
093import org.apache.hadoop.hbase.ipc.RpcCall;
094import org.apache.hadoop.hbase.ipc.RpcCallContext;
095import org.apache.hadoop.hbase.ipc.RpcCallback;
096import org.apache.hadoop.hbase.ipc.RpcServer;
097import org.apache.hadoop.hbase.ipc.RpcServer.BlockingServiceAndInterface;
098import org.apache.hadoop.hbase.ipc.RpcServerFactory;
099import org.apache.hadoop.hbase.ipc.RpcServerInterface;
100import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
101import org.apache.hadoop.hbase.ipc.ServerRpcController;
102import org.apache.hadoop.hbase.net.Address;
103import org.apache.hadoop.hbase.procedure2.RSProcedureCallable;
104import org.apache.hadoop.hbase.quotas.ActivePolicyEnforcement;
105import org.apache.hadoop.hbase.quotas.OperationQuota;
106import org.apache.hadoop.hbase.quotas.QuotaUtil;
107import org.apache.hadoop.hbase.quotas.RegionServerRpcQuotaManager;
108import org.apache.hadoop.hbase.quotas.RegionServerSpaceQuotaManager;
109import org.apache.hadoop.hbase.quotas.SpaceQuotaSnapshot;
110import org.apache.hadoop.hbase.quotas.SpaceViolationPolicyEnforcement;
111import org.apache.hadoop.hbase.regionserver.LeaseManager.Lease;
112import org.apache.hadoop.hbase.regionserver.LeaseManager.LeaseStillHeldException;
113import org.apache.hadoop.hbase.regionserver.Region.Operation;
114import org.apache.hadoop.hbase.regionserver.ScannerContext.LimitScope;
115import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
116import org.apache.hadoop.hbase.regionserver.handler.AssignRegionHandler;
117import org.apache.hadoop.hbase.regionserver.handler.OpenMetaHandler;
118import org.apache.hadoop.hbase.regionserver.handler.OpenPriorityRegionHandler;
119import org.apache.hadoop.hbase.regionserver.handler.OpenRegionHandler;
120import org.apache.hadoop.hbase.regionserver.handler.UnassignRegionHandler;
121import org.apache.hadoop.hbase.replication.ReplicationUtils;
122import org.apache.hadoop.hbase.replication.regionserver.RejectReplicationRequestStateChecker;
123import org.apache.hadoop.hbase.replication.regionserver.RejectRequestsFromClientStateChecker;
124import org.apache.hadoop.hbase.security.Superusers;
125import org.apache.hadoop.hbase.security.access.Permission;
126import org.apache.hadoop.hbase.util.Bytes;
127import org.apache.hadoop.hbase.util.DNS;
128import org.apache.hadoop.hbase.util.DNS.ServerType;
129import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
130import org.apache.hadoop.hbase.util.Pair;
131import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil;
132import org.apache.hadoop.hbase.wal.WAL;
133import org.apache.hadoop.hbase.wal.WALEdit;
134import org.apache.hadoop.hbase.wal.WALKey;
135import org.apache.hadoop.hbase.wal.WALSplitUtil;
136import org.apache.hadoop.hbase.wal.WALSplitUtil.MutationReplay;
137import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
138import org.apache.yetus.audience.InterfaceAudience;
139import org.slf4j.Logger;
140import org.slf4j.LoggerFactory;
141
142import org.apache.hbase.thirdparty.com.google.common.cache.Cache;
143import org.apache.hbase.thirdparty.com.google.common.cache.CacheBuilder;
144import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList;
145import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
146import org.apache.hbase.thirdparty.com.google.protobuf.ByteString;
147import org.apache.hbase.thirdparty.com.google.protobuf.Message;
148import org.apache.hbase.thirdparty.com.google.protobuf.RpcController;
149import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
150import org.apache.hbase.thirdparty.com.google.protobuf.TextFormat;
151import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations;
152import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils;
153
154import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
155import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
156import org.apache.hadoop.hbase.shaded.protobuf.ResponseConverter;
157import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService;
158import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearCompactionQueuesRequest;
159import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearCompactionQueuesResponse;
160import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearRegionBlockCacheRequest;
161import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearRegionBlockCacheResponse;
162import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CloseRegionRequest;
163import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CloseRegionResponse;
164import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactRegionRequest;
165import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactRegionResponse;
166import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactionSwitchRequest;
167import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactionSwitchResponse;
168import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ExecuteProceduresRequest;
169import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ExecuteProceduresResponse;
170import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionRequest;
171import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionResponse;
172import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetOnlineRegionRequest;
173import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetOnlineRegionResponse;
174import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoRequest;
175import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoResponse;
176import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionLoadRequest;
177import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionLoadResponse;
178import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetServerInfoRequest;
179import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetServerInfoResponse;
180import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetStoreFileRequest;
181import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetStoreFileResponse;
182import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionRequest;
183import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionRequest.RegionOpenInfo;
184import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionResponse;
185import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionResponse.RegionOpeningState;
186import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.RemoteProcedureRequest;
187import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ReplicateWALEntryRequest;
188import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ReplicateWALEntryResponse;
189import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.RollWALWriterRequest;
190import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.RollWALWriterResponse;
191import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.StopServerRequest;
192import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.StopServerResponse;
193import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateFavoredNodesRequest;
194import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateFavoredNodesResponse;
195import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WALEntry;
196import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WarmupRegionRequest;
197import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WarmupRegionResponse;
198import org.apache.hadoop.hbase.shaded.protobuf.generated.BootstrapNodeProtos.BootstrapNodeService;
199import org.apache.hadoop.hbase.shaded.protobuf.generated.BootstrapNodeProtos.GetAllBootstrapNodesRequest;
200import org.apache.hadoop.hbase.shaded.protobuf.generated.BootstrapNodeProtos.GetAllBootstrapNodesResponse;
201import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
202import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.Action;
203import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.BulkLoadHFileRequest;
204import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.BulkLoadHFileRequest.FamilyPath;
205import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.BulkLoadHFileResponse;
206import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CleanupBulkLoadRequest;
207import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CleanupBulkLoadResponse;
208import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService;
209import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.Condition;
210import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceRequest;
211import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceResponse;
212import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.GetRequest;
213import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.GetResponse;
214import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MultiRegionLoadStats;
215import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MultiRequest;
216import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MultiResponse;
217import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateRequest;
218import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateResponse;
219import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutationProto;
220import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutationProto.MutationType;
221import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.PrepareBulkLoadRequest;
222import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.PrepareBulkLoadResponse;
223import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.RegionAction;
224import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.RegionActionResult;
225import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ResultOrException;
226import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanRequest;
227import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanResponse;
228import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos;
229import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos.RegionLoad;
230import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.NameBytesPair;
231import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.NameInt64Pair;
232import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier;
233import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType;
234import org.apache.hadoop.hbase.shaded.protobuf.generated.MapReduceProtos.ScanMetrics;
235import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsRequest;
236import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsResponse;
237import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsResponse.TableQuotaSnapshot;
238import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.ClientMetaService;
239import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.BulkLoadDescriptor;
240import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.CompactionDescriptor;
241import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.FlushDescriptor;
242import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.RegionEventDescriptor;
243
244/**
245 * Implements the regionserver RPC services.
246 */
247@InterfaceAudience.Private
248public class RSRpcServices extends HBaseRpcServicesBase<HRegionServer>
249  implements ClientService.BlockingInterface, BootstrapNodeService.BlockingInterface {
250
251  private static final Logger LOG = LoggerFactory.getLogger(RSRpcServices.class);
252
253  /** RPC scheduler to use for the region server. */
254  public static final String REGION_SERVER_RPC_SCHEDULER_FACTORY_CLASS =
255    "hbase.region.server.rpc.scheduler.factory.class";
256
257  /**
258   * Minimum allowable time limit delta (in milliseconds) that can be enforced during scans. This
259   * configuration exists to prevent the scenario where a time limit is specified to be so
260   * restrictive that the time limit is reached immediately (before any cells are scanned).
261   */
262  private static final String REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA =
263    "hbase.region.server.rpc.minimum.scan.time.limit.delta";
264  /**
265   * Default value of {@link RSRpcServices#REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA}
266   */
267  static final long DEFAULT_REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA = 10;
268
269  /**
270   * Whether to reject rows with size > threshold defined by
271   * {@link HConstants#BATCH_ROWS_THRESHOLD_NAME}
272   */
273  private static final String REJECT_BATCH_ROWS_OVER_THRESHOLD =
274    "hbase.rpc.rows.size.threshold.reject";
275
276  /**
277   * Default value of config {@link RSRpcServices#REJECT_BATCH_ROWS_OVER_THRESHOLD}
278   */
279  private static final boolean DEFAULT_REJECT_BATCH_ROWS_OVER_THRESHOLD = false;
280
281  // Request counter. (Includes requests that are not serviced by regions.)
282  // Count only once for requests with multiple actions like multi/caching-scan/replayBatch
283  final LongAdder requestCount = new LongAdder();
284
285  // Request counter for rpc get
286  final LongAdder rpcGetRequestCount = new LongAdder();
287
288  // Request counter for rpc scan
289  final LongAdder rpcScanRequestCount = new LongAdder();
290
291  // Request counter for scans that might end up in full scans
292  final LongAdder rpcFullScanRequestCount = new LongAdder();
293
294  // Request counter for rpc multi
295  final LongAdder rpcMultiRequestCount = new LongAdder();
296
297  // Request counter for rpc mutate
298  final LongAdder rpcMutateRequestCount = new LongAdder();
299
300  private final long maxScannerResultSize;
301
302  private ScannerIdGenerator scannerIdGenerator;
303  private final ConcurrentMap<String, RegionScannerHolder> scanners = new ConcurrentHashMap<>();
304  // Hold the name of a closed scanner for a while. This is used to keep compatible for old clients
305  // which may send next or close request to a region scanner which has already been exhausted. The
306  // entries will be removed automatically after scannerLeaseTimeoutPeriod.
307  private final Cache<String, String> closedScanners;
308  /**
309   * The lease timeout period for client scanners (milliseconds).
310   */
311  private final int scannerLeaseTimeoutPeriod;
312
313  /**
314   * The RPC timeout period (milliseconds)
315   */
316  private final int rpcTimeout;
317
318  /**
319   * The minimum allowable delta to use for the scan limit
320   */
321  private final long minimumScanTimeLimitDelta;
322
323  /**
324   * Row size threshold for multi requests above which a warning is logged
325   */
326  private final int rowSizeWarnThreshold;
327  /*
328   * Whether we should reject requests with very high no of rows i.e. beyond threshold defined by
329   * rowSizeWarnThreshold
330   */
331  private final boolean rejectRowsWithSizeOverThreshold;
332
333  final AtomicBoolean clearCompactionQueues = new AtomicBoolean(false);
334
335  /**
336   * Services launched in RSRpcServices. By default they are on but you can use the below booleans
337   * to selectively enable/disable either Admin or Client Service (Rare is the case where you would
338   * ever turn off one or the other).
339   */
340  public static final String REGIONSERVER_ADMIN_SERVICE_CONFIG =
341    "hbase.regionserver.admin.executorService";
342  public static final String REGIONSERVER_CLIENT_SERVICE_CONFIG =
343    "hbase.regionserver.client.executorService";
344  public static final String REGIONSERVER_CLIENT_META_SERVICE_CONFIG =
345    "hbase.regionserver.client.meta.executorService";
346
347  /**
348   * An Rpc callback for closing a RegionScanner.
349   */
350  private static final class RegionScannerCloseCallBack implements RpcCallback {
351
352    private final RegionScanner scanner;
353
354    public RegionScannerCloseCallBack(RegionScanner scanner) {
355      this.scanner = scanner;
356    }
357
358    @Override
359    public void run() throws IOException {
360      this.scanner.close();
361    }
362  }
363
364  /**
365   * An Rpc callback for doing shipped() call on a RegionScanner.
366   */
367  private class RegionScannerShippedCallBack implements RpcCallback {
368    private final String scannerName;
369    private final Shipper shipper;
370    private final Lease lease;
371
372    public RegionScannerShippedCallBack(String scannerName, Shipper shipper, Lease lease) {
373      this.scannerName = scannerName;
374      this.shipper = shipper;
375      this.lease = lease;
376    }
377
378    @Override
379    public void run() throws IOException {
380      this.shipper.shipped();
381      // We're done. On way out re-add the above removed lease. The lease was temp removed for this
382      // Rpc call and we are at end of the call now. Time to add it back.
383      if (scanners.containsKey(scannerName)) {
384        if (lease != null) {
385          server.getLeaseManager().addLease(lease);
386        }
387      }
388    }
389  }
390
391  /**
392   * An RpcCallBack that creates a list of scanners that needs to perform callBack operation on
393   * completion of multiGets.
394   */
395  static class RegionScannersCloseCallBack implements RpcCallback {
396    private final List<RegionScanner> scanners = new ArrayList<>();
397
398    public void addScanner(RegionScanner scanner) {
399      this.scanners.add(scanner);
400    }
401
402    @Override
403    public void run() {
404      for (RegionScanner scanner : scanners) {
405        try {
406          scanner.close();
407        } catch (IOException e) {
408          LOG.error("Exception while closing the scanner " + scanner, e);
409        }
410      }
411    }
412  }
413
414  /**
415   * Holder class which holds the RegionScanner, nextCallSeq and RpcCallbacks together.
416   */
417  static final class RegionScannerHolder {
418    private final AtomicLong nextCallSeq = new AtomicLong(0);
419    private final RegionScanner s;
420    private final HRegion r;
421    private final RpcCallback closeCallBack;
422    private final RpcCallback shippedCallback;
423    private byte[] rowOfLastPartialResult;
424    private boolean needCursor;
425    private boolean fullRegionScan;
426    private final String clientIPAndPort;
427    private final String userName;
428
429    RegionScannerHolder(RegionScanner s, HRegion r, RpcCallback closeCallBack,
430      RpcCallback shippedCallback, boolean needCursor, boolean fullRegionScan,
431      String clientIPAndPort, String userName) {
432      this.s = s;
433      this.r = r;
434      this.closeCallBack = closeCallBack;
435      this.shippedCallback = shippedCallback;
436      this.needCursor = needCursor;
437      this.fullRegionScan = fullRegionScan;
438      this.clientIPAndPort = clientIPAndPort;
439      this.userName = userName;
440    }
441
442    long getNextCallSeq() {
443      return nextCallSeq.get();
444    }
445
446    boolean incNextCallSeq(long currentSeq) {
447      // Use CAS to prevent multiple scan request running on the same scanner.
448      return nextCallSeq.compareAndSet(currentSeq, currentSeq + 1);
449    }
450
451    // Should be called only when we need to print lease expired messages otherwise
452    // cache the String once made.
453    @Override
454    public String toString() {
455      return "clientIPAndPort=" + this.clientIPAndPort + ", userName=" + this.userName
456        + ", regionInfo=" + this.r.getRegionInfo().getRegionNameAsString();
457    }
458  }
459
460  /**
461   * Instantiated as a scanner lease. If the lease times out, the scanner is closed
462   */
463  private class ScannerListener implements LeaseListener {
464    private final String scannerName;
465
466    ScannerListener(final String n) {
467      this.scannerName = n;
468    }
469
470    @Override
471    public void leaseExpired() {
472      RegionScannerHolder rsh = scanners.remove(this.scannerName);
473      if (rsh == null) {
474        LOG.warn("Scanner lease {} expired but no outstanding scanner", this.scannerName);
475        return;
476      }
477      LOG.info("Scanner lease {} expired {}", this.scannerName, rsh);
478      server.getMetrics().incrScannerLeaseExpired();
479      RegionScanner s = rsh.s;
480      HRegion region = null;
481      try {
482        region = server.getRegion(s.getRegionInfo().getRegionName());
483        if (region != null && region.getCoprocessorHost() != null) {
484          region.getCoprocessorHost().preScannerClose(s);
485        }
486      } catch (IOException e) {
487        LOG.error("Closing scanner {} {}", this.scannerName, rsh, e);
488      } finally {
489        try {
490          s.close();
491          if (region != null && region.getCoprocessorHost() != null) {
492            region.getCoprocessorHost().postScannerClose(s);
493          }
494        } catch (IOException e) {
495          LOG.error("Closing scanner {} {}", this.scannerName, rsh, e);
496        }
497      }
498    }
499  }
500
501  private static ResultOrException getResultOrException(final ClientProtos.Result r,
502    final int index) {
503    return getResultOrException(ResponseConverter.buildActionResult(r), index);
504  }
505
506  private static ResultOrException getResultOrException(final Exception e, final int index) {
507    return getResultOrException(ResponseConverter.buildActionResult(e), index);
508  }
509
510  private static ResultOrException getResultOrException(final ResultOrException.Builder builder,
511    final int index) {
512    return builder.setIndex(index).build();
513  }
514
515  /**
516   * Checks for the following pre-checks in order:
517   * <ol>
518   * <li>RegionServer is running</li>
519   * <li>If authorization is enabled, then RPC caller has ADMIN permissions</li>
520   * </ol>
521   * @param requestName name of rpc request. Used in reporting failures to provide context.
522   * @throws ServiceException If any of the above listed pre-check fails.
523   */
524  private void rpcPreCheck(String requestName) throws ServiceException {
525    try {
526      checkOpen();
527      requirePermission(requestName, Permission.Action.ADMIN);
528    } catch (IOException ioe) {
529      throw new ServiceException(ioe);
530    }
531  }
532
533  private boolean isClientCellBlockSupport(RpcCallContext context) {
534    return context != null && context.isClientCellBlockSupported();
535  }
536
537  private void addResult(final MutateResponse.Builder builder, final Result result,
538    final HBaseRpcController rpcc, boolean clientCellBlockSupported) {
539    if (result == null) return;
540    if (clientCellBlockSupported) {
541      builder.setResult(ProtobufUtil.toResultNoData(result));
542      rpcc.setCellScanner(result.cellScanner());
543    } else {
544      ClientProtos.Result pbr = ProtobufUtil.toResult(result);
545      builder.setResult(pbr);
546    }
547  }
548
549  private void addResults(ScanResponse.Builder builder, List<Result> results,
550    HBaseRpcController controller, boolean isDefaultRegion, boolean clientCellBlockSupported) {
551    builder.setStale(!isDefaultRegion);
552    if (results.isEmpty()) {
553      return;
554    }
555    if (clientCellBlockSupported) {
556      for (Result res : results) {
557        builder.addCellsPerResult(res.size());
558        builder.addPartialFlagPerResult(res.mayHaveMoreCellsInRow());
559      }
560      controller.setCellScanner(CellUtil.createCellScanner(results));
561    } else {
562      for (Result res : results) {
563        ClientProtos.Result pbr = ProtobufUtil.toResult(res);
564        builder.addResults(pbr);
565      }
566    }
567  }
568
569  private CheckAndMutateResult checkAndMutate(HRegion region, List<ClientProtos.Action> actions,
570    CellScanner cellScanner, Condition condition, long nonceGroup,
571    ActivePolicyEnforcement spaceQuotaEnforcement) throws IOException {
572    int countOfCompleteMutation = 0;
573    try {
574      if (!region.getRegionInfo().isMetaRegion()) {
575        server.getMemStoreFlusher().reclaimMemStoreMemory();
576      }
577      List<Mutation> mutations = new ArrayList<>();
578      long nonce = HConstants.NO_NONCE;
579      for (ClientProtos.Action action : actions) {
580        if (action.hasGet()) {
581          throw new DoNotRetryIOException(
582            "Atomic put and/or delete only, not a Get=" + action.getGet());
583        }
584        MutationProto mutation = action.getMutation();
585        MutationType type = mutation.getMutateType();
586        switch (type) {
587          case PUT:
588            Put put = ProtobufUtil.toPut(mutation, cellScanner);
589            ++countOfCompleteMutation;
590            checkCellSizeLimit(region, put);
591            spaceQuotaEnforcement.getPolicyEnforcement(region).check(put);
592            mutations.add(put);
593            break;
594          case DELETE:
595            Delete del = ProtobufUtil.toDelete(mutation, cellScanner);
596            ++countOfCompleteMutation;
597            spaceQuotaEnforcement.getPolicyEnforcement(region).check(del);
598            mutations.add(del);
599            break;
600          case INCREMENT:
601            Increment increment = ProtobufUtil.toIncrement(mutation, cellScanner);
602            nonce = mutation.hasNonce() ? mutation.getNonce() : HConstants.NO_NONCE;
603            ++countOfCompleteMutation;
604            checkCellSizeLimit(region, increment);
605            spaceQuotaEnforcement.getPolicyEnforcement(region).check(increment);
606            mutations.add(increment);
607            break;
608          case APPEND:
609            Append append = ProtobufUtil.toAppend(mutation, cellScanner);
610            nonce = mutation.hasNonce() ? mutation.getNonce() : HConstants.NO_NONCE;
611            ++countOfCompleteMutation;
612            checkCellSizeLimit(region, append);
613            spaceQuotaEnforcement.getPolicyEnforcement(region).check(append);
614            mutations.add(append);
615            break;
616          default:
617            throw new DoNotRetryIOException("invalid mutation type : " + type);
618        }
619      }
620
621      if (mutations.size() == 0) {
622        return new CheckAndMutateResult(true, null);
623      } else {
624        CheckAndMutate checkAndMutate = ProtobufUtil.toCheckAndMutate(condition, mutations);
625        CheckAndMutateResult result = null;
626        if (region.getCoprocessorHost() != null) {
627          result = region.getCoprocessorHost().preCheckAndMutate(checkAndMutate);
628        }
629        if (result == null) {
630          result = region.checkAndMutate(checkAndMutate, nonceGroup, nonce);
631          if (region.getCoprocessorHost() != null) {
632            result = region.getCoprocessorHost().postCheckAndMutate(checkAndMutate, result);
633          }
634        }
635        return result;
636      }
637    } finally {
638      // Currently, the checkAndMutate isn't supported by batch so it won't mess up the cell scanner
639      // even if the malformed cells are not skipped.
640      for (int i = countOfCompleteMutation; i < actions.size(); ++i) {
641        skipCellsForMutation(actions.get(i), cellScanner);
642      }
643    }
644  }
645
646  /**
647   * Execute an append mutation.
648   * @return result to return to client if default operation should be bypassed as indicated by
649   *         RegionObserver, null otherwise
650   */
651  private Result append(final HRegion region, final OperationQuota quota,
652    final MutationProto mutation, final CellScanner cellScanner, long nonceGroup,
653    ActivePolicyEnforcement spaceQuota) throws IOException {
654    long before = EnvironmentEdgeManager.currentTime();
655    Append append = ProtobufUtil.toAppend(mutation, cellScanner);
656    checkCellSizeLimit(region, append);
657    spaceQuota.getPolicyEnforcement(region).check(append);
658    quota.addMutation(append);
659    long nonce = mutation.hasNonce() ? mutation.getNonce() : HConstants.NO_NONCE;
660    Result r = region.append(append, nonceGroup, nonce);
661    if (server.getMetrics() != null) {
662      server.getMetrics().updateAppend(region, EnvironmentEdgeManager.currentTime() - before);
663    }
664    return r == null ? Result.EMPTY_RESULT : r;
665  }
666
667  /**
668   * Execute an increment mutation.
669   */
670  private Result increment(final HRegion region, final OperationQuota quota,
671    final MutationProto mutation, final CellScanner cells, long nonceGroup,
672    ActivePolicyEnforcement spaceQuota) throws IOException {
673    long before = EnvironmentEdgeManager.currentTime();
674    Increment increment = ProtobufUtil.toIncrement(mutation, cells);
675    checkCellSizeLimit(region, increment);
676    spaceQuota.getPolicyEnforcement(region).check(increment);
677    quota.addMutation(increment);
678    long nonce = mutation.hasNonce() ? mutation.getNonce() : HConstants.NO_NONCE;
679    Result r = region.increment(increment, nonceGroup, nonce);
680    final MetricsRegionServer metricsRegionServer = server.getMetrics();
681    if (metricsRegionServer != null) {
682      metricsRegionServer.updateIncrement(region, EnvironmentEdgeManager.currentTime() - before);
683    }
684    return r == null ? Result.EMPTY_RESULT : r;
685  }
686
687  /**
688   * Run through the regionMutation <code>rm</code> and per Mutation, do the work, and then when
689   * done, add an instance of a {@link ResultOrException} that corresponds to each Mutation.
690   * @param cellsToReturn Could be null. May be allocated in this method. This is what this method
691   *                      returns as a 'result'.
692   * @param closeCallBack the callback to be used with multigets
693   * @param context       the current RpcCallContext
694   * @return Return the <code>cellScanner</code> passed
695   */
696  private List<CellScannable> doNonAtomicRegionMutation(final HRegion region,
697    final OperationQuota quota, final RegionAction actions, final CellScanner cellScanner,
698    final RegionActionResult.Builder builder, List<CellScannable> cellsToReturn, long nonceGroup,
699    final RegionScannersCloseCallBack closeCallBack, RpcCallContext context,
700    ActivePolicyEnforcement spaceQuotaEnforcement) {
701    // Gather up CONTIGUOUS Puts and Deletes in this mutations List. Idea is that rather than do
702    // one at a time, we instead pass them in batch. Be aware that the corresponding
703    // ResultOrException instance that matches each Put or Delete is then added down in the
704    // doNonAtomicBatchOp call. We should be staying aligned though the Put and Delete are
705    // deferred/batched
706    List<ClientProtos.Action> mutations = null;
707    long maxQuotaResultSize = Math.min(maxScannerResultSize, quota.getReadAvailable());
708    IOException sizeIOE = null;
709    ClientProtos.ResultOrException.Builder resultOrExceptionBuilder =
710      ResultOrException.newBuilder();
711    boolean hasResultOrException = false;
712    for (ClientProtos.Action action : actions.getActionList()) {
713      hasResultOrException = false;
714      resultOrExceptionBuilder.clear();
715      try {
716        Result r = null;
717
718        if (
719          context != null && context.isRetryImmediatelySupported()
720            && (context.getResponseCellSize() > maxQuotaResultSize
721              || context.getResponseBlockSize() + context.getResponseExceptionSize()
722                  > maxQuotaResultSize)
723        ) {
724
725          // We're storing the exception since the exception and reason string won't
726          // change after the response size limit is reached.
727          if (sizeIOE == null) {
728            // We don't need the stack un-winding do don't throw the exception.
729            // Throwing will kill the JVM's JIT.
730            //
731            // Instead just create the exception and then store it.
732            sizeIOE = new MultiActionResultTooLarge("Max size exceeded" + " CellSize: "
733              + context.getResponseCellSize() + " BlockSize: " + context.getResponseBlockSize());
734
735            // Only report the exception once since there's only one request that
736            // caused the exception. Otherwise this number will dominate the exceptions count.
737            rpcServer.getMetrics().exception(sizeIOE);
738          }
739
740          // Now that there's an exception is known to be created
741          // use it for the response.
742          //
743          // This will create a copy in the builder.
744          NameBytesPair pair = ResponseConverter.buildException(sizeIOE);
745          resultOrExceptionBuilder.setException(pair);
746          context.incrementResponseExceptionSize(pair.getSerializedSize());
747          resultOrExceptionBuilder.setIndex(action.getIndex());
748          builder.addResultOrException(resultOrExceptionBuilder.build());
749          skipCellsForMutation(action, cellScanner);
750          continue;
751        }
752        if (action.hasGet()) {
753          long before = EnvironmentEdgeManager.currentTime();
754          ClientProtos.Get pbGet = action.getGet();
755          // An asynchbase client, https://github.com/OpenTSDB/asynchbase, starts by trying to do
756          // a get closest before. Throwing the UnknownProtocolException signals it that it needs
757          // to switch and do hbase2 protocol (HBase servers do not tell clients what versions
758          // they are; its a problem for non-native clients like asynchbase. HBASE-20225.
759          if (pbGet.hasClosestRowBefore() && pbGet.getClosestRowBefore()) {
760            throw new UnknownProtocolException("Is this a pre-hbase-1.0.0 or asynchbase client? "
761              + "Client is invoking getClosestRowBefore removed in hbase-2.0.0 replaced by "
762              + "reverse Scan.");
763          }
764          try {
765            Get get = ProtobufUtil.toGet(pbGet);
766            if (context != null) {
767              r = get(get, (region), closeCallBack, context);
768            } else {
769              r = region.get(get);
770            }
771          } finally {
772            final MetricsRegionServer metricsRegionServer = server.getMetrics();
773            if (metricsRegionServer != null) {
774              metricsRegionServer.updateGet(region, EnvironmentEdgeManager.currentTime() - before);
775            }
776          }
777        } else if (action.hasServiceCall()) {
778          hasResultOrException = true;
779          Message result = execServiceOnRegion(region, action.getServiceCall());
780          ClientProtos.CoprocessorServiceResult.Builder serviceResultBuilder =
781            ClientProtos.CoprocessorServiceResult.newBuilder();
782          resultOrExceptionBuilder.setServiceResult(serviceResultBuilder
783            .setValue(serviceResultBuilder.getValueBuilder().setName(result.getClass().getName())
784              // TODO: Copy!!!
785              .setValue(UnsafeByteOperations.unsafeWrap(result.toByteArray()))));
786        } else if (action.hasMutation()) {
787          MutationType type = action.getMutation().getMutateType();
788          if (
789            type != MutationType.PUT && type != MutationType.DELETE && mutations != null
790              && !mutations.isEmpty()
791          ) {
792            // Flush out any Puts or Deletes already collected.
793            doNonAtomicBatchOp(builder, region, quota, mutations, cellScanner,
794              spaceQuotaEnforcement);
795            mutations.clear();
796          }
797          switch (type) {
798            case APPEND:
799              r = append(region, quota, action.getMutation(), cellScanner, nonceGroup,
800                spaceQuotaEnforcement);
801              break;
802            case INCREMENT:
803              r = increment(region, quota, action.getMutation(), cellScanner, nonceGroup,
804                spaceQuotaEnforcement);
805              break;
806            case PUT:
807            case DELETE:
808              // Collect the individual mutations and apply in a batch
809              if (mutations == null) {
810                mutations = new ArrayList<>(actions.getActionCount());
811              }
812              mutations.add(action);
813              break;
814            default:
815              throw new DoNotRetryIOException("Unsupported mutate type: " + type.name());
816          }
817        } else {
818          throw new HBaseIOException("Unexpected Action type");
819        }
820        if (r != null) {
821          ClientProtos.Result pbResult = null;
822          if (isClientCellBlockSupport(context)) {
823            pbResult = ProtobufUtil.toResultNoData(r);
824            // Hard to guess the size here. Just make a rough guess.
825            if (cellsToReturn == null) {
826              cellsToReturn = new ArrayList<>();
827            }
828            cellsToReturn.add(r);
829          } else {
830            pbResult = ProtobufUtil.toResult(r);
831          }
832          addSize(context, r);
833          hasResultOrException = true;
834          resultOrExceptionBuilder.setResult(pbResult);
835        }
836        // Could get to here and there was no result and no exception. Presumes we added
837        // a Put or Delete to the collecting Mutations List for adding later. In this
838        // case the corresponding ResultOrException instance for the Put or Delete will be added
839        // down in the doNonAtomicBatchOp method call rather than up here.
840      } catch (IOException ie) {
841        rpcServer.getMetrics().exception(ie);
842        hasResultOrException = true;
843        NameBytesPair pair = ResponseConverter.buildException(ie);
844        resultOrExceptionBuilder.setException(pair);
845        context.incrementResponseExceptionSize(pair.getSerializedSize());
846      }
847      if (hasResultOrException) {
848        // Propagate index.
849        resultOrExceptionBuilder.setIndex(action.getIndex());
850        builder.addResultOrException(resultOrExceptionBuilder.build());
851      }
852    }
853    // Finish up any outstanding mutations
854    if (!CollectionUtils.isEmpty(mutations)) {
855      doNonAtomicBatchOp(builder, region, quota, mutations, cellScanner, spaceQuotaEnforcement);
856    }
857    return cellsToReturn;
858  }
859
860  private void checkCellSizeLimit(final HRegion r, final Mutation m) throws IOException {
861    if (r.maxCellSize > 0) {
862      CellScanner cells = m.cellScanner();
863      while (cells.advance()) {
864        int size = PrivateCellUtil.estimatedSerializedSizeOf(cells.current());
865        if (size > r.maxCellSize) {
866          String msg = "Cell[" + cells.current() + "] with size " + size + " exceeds limit of "
867            + r.maxCellSize + " bytes";
868          LOG.debug(msg);
869          throw new DoNotRetryIOException(msg);
870        }
871      }
872    }
873  }
874
875  private void doAtomicBatchOp(final RegionActionResult.Builder builder, final HRegion region,
876    final OperationQuota quota, final List<ClientProtos.Action> mutations, final CellScanner cells,
877    long nonceGroup, ActivePolicyEnforcement spaceQuotaEnforcement) throws IOException {
878    // Just throw the exception. The exception will be caught and then added to region-level
879    // exception for RegionAction. Leaving the null to action result is ok since the null
880    // result is viewed as failure by hbase client. And the region-lever exception will be used
881    // to replaced the null result. see AsyncRequestFutureImpl#receiveMultiAction and
882    // AsyncBatchRpcRetryingCaller#onComplete for more details.
883    doBatchOp(builder, region, quota, mutations, cells, nonceGroup, spaceQuotaEnforcement, true);
884  }
885
886  private void doNonAtomicBatchOp(final RegionActionResult.Builder builder, final HRegion region,
887    final OperationQuota quota, final List<ClientProtos.Action> mutations, final CellScanner cells,
888    ActivePolicyEnforcement spaceQuotaEnforcement) {
889    try {
890      doBatchOp(builder, region, quota, mutations, cells, HConstants.NO_NONCE,
891        spaceQuotaEnforcement, false);
892    } catch (IOException e) {
893      // Set the exception for each action. The mutations in same RegionAction are group to
894      // different batch and then be processed individually. Hence, we don't set the region-level
895      // exception here for whole RegionAction.
896      for (Action mutation : mutations) {
897        builder.addResultOrException(getResultOrException(e, mutation.getIndex()));
898      }
899    }
900  }
901
902  /**
903   * Execute a list of mutations.
904   */
905  private void doBatchOp(final RegionActionResult.Builder builder, final HRegion region,
906    final OperationQuota quota, final List<ClientProtos.Action> mutations, final CellScanner cells,
907    long nonceGroup, ActivePolicyEnforcement spaceQuotaEnforcement, boolean atomic)
908    throws IOException {
909    Mutation[] mArray = new Mutation[mutations.size()];
910    long before = EnvironmentEdgeManager.currentTime();
911    boolean batchContainsPuts = false, batchContainsDelete = false;
912    try {
913      /**
914       * HBASE-17924 mutationActionMap is a map to map the relation between mutations and actions
915       * since mutation array may have been reoredered.In order to return the right result or
916       * exception to the corresponding actions, We need to know which action is the mutation belong
917       * to. We can't sort ClientProtos.Action array, since they are bonded to cellscanners.
918       */
919      Map<Mutation, ClientProtos.Action> mutationActionMap = new HashMap<>();
920      int i = 0;
921      long nonce = HConstants.NO_NONCE;
922      for (ClientProtos.Action action : mutations) {
923        if (action.hasGet()) {
924          throw new DoNotRetryIOException(
925            "Atomic put and/or delete only, not a Get=" + action.getGet());
926        }
927        MutationProto m = action.getMutation();
928        Mutation mutation;
929        switch (m.getMutateType()) {
930          case PUT:
931            mutation = ProtobufUtil.toPut(m, cells);
932            batchContainsPuts = true;
933            break;
934
935          case DELETE:
936            mutation = ProtobufUtil.toDelete(m, cells);
937            batchContainsDelete = true;
938            break;
939
940          case INCREMENT:
941            mutation = ProtobufUtil.toIncrement(m, cells);
942            nonce = m.hasNonce() ? m.getNonce() : HConstants.NO_NONCE;
943            break;
944
945          case APPEND:
946            mutation = ProtobufUtil.toAppend(m, cells);
947            nonce = m.hasNonce() ? m.getNonce() : HConstants.NO_NONCE;
948            break;
949
950          default:
951            throw new DoNotRetryIOException("Invalid mutation type : " + m.getMutateType());
952        }
953        mutationActionMap.put(mutation, action);
954        mArray[i++] = mutation;
955        checkCellSizeLimit(region, mutation);
956        // Check if a space quota disallows this mutation
957        spaceQuotaEnforcement.getPolicyEnforcement(region).check(mutation);
958        quota.addMutation(mutation);
959      }
960
961      if (!region.getRegionInfo().isMetaRegion()) {
962        server.getMemStoreFlusher().reclaimMemStoreMemory();
963      }
964
965      // HBASE-17924
966      // Sort to improve lock efficiency for non-atomic batch of operations. If atomic
967      // order is preserved as its expected from the client
968      if (!atomic) {
969        Arrays.sort(mArray, (v1, v2) -> Row.COMPARATOR.compare(v1, v2));
970      }
971
972      OperationStatus[] codes = region.batchMutate(mArray, atomic, nonceGroup, nonce);
973
974      // When atomic is true, it indicates that the mutateRow API or the batch API with
975      // RowMutations is called. In this case, we need to merge the results of the
976      // Increment/Append operations if the mutations include those operations, and set the merged
977      // result to the first element of the ResultOrException list
978      if (atomic) {
979        List<ResultOrException> resultOrExceptions = new ArrayList<>();
980        List<Result> results = new ArrayList<>();
981        for (i = 0; i < codes.length; i++) {
982          if (codes[i].getResult() != null) {
983            results.add(codes[i].getResult());
984          }
985          if (i != 0) {
986            resultOrExceptions
987              .add(getResultOrException(ClientProtos.Result.getDefaultInstance(), i));
988          }
989        }
990
991        if (results.isEmpty()) {
992          builder.addResultOrException(
993            getResultOrException(ClientProtos.Result.getDefaultInstance(), 0));
994        } else {
995          // Merge the results of the Increment/Append operations
996          List<Cell> cellList = new ArrayList<>();
997          for (Result result : results) {
998            if (result.rawCells() != null) {
999              cellList.addAll(Arrays.asList(result.rawCells()));
1000            }
1001          }
1002          Result result = Result.create(cellList);
1003
1004          // Set the merged result of the Increment/Append operations to the first element of the
1005          // ResultOrException list
1006          builder.addResultOrException(getResultOrException(ProtobufUtil.toResult(result), 0));
1007        }
1008
1009        builder.addAllResultOrException(resultOrExceptions);
1010        return;
1011      }
1012
1013      for (i = 0; i < codes.length; i++) {
1014        Mutation currentMutation = mArray[i];
1015        ClientProtos.Action currentAction = mutationActionMap.get(currentMutation);
1016        int index = currentAction.hasIndex() ? currentAction.getIndex() : i;
1017        Exception e;
1018        switch (codes[i].getOperationStatusCode()) {
1019          case BAD_FAMILY:
1020            e = new NoSuchColumnFamilyException(codes[i].getExceptionMsg());
1021            builder.addResultOrException(getResultOrException(e, index));
1022            break;
1023
1024          case SANITY_CHECK_FAILURE:
1025            e = new FailedSanityCheckException(codes[i].getExceptionMsg());
1026            builder.addResultOrException(getResultOrException(e, index));
1027            break;
1028
1029          default:
1030            e = new DoNotRetryIOException(codes[i].getExceptionMsg());
1031            builder.addResultOrException(getResultOrException(e, index));
1032            break;
1033
1034          case SUCCESS:
1035            builder.addResultOrException(
1036              getResultOrException(ClientProtos.Result.getDefaultInstance(), index));
1037            break;
1038
1039          case STORE_TOO_BUSY:
1040            e = new RegionTooBusyException(codes[i].getExceptionMsg());
1041            builder.addResultOrException(getResultOrException(e, index));
1042            break;
1043        }
1044      }
1045    } finally {
1046      int processedMutationIndex = 0;
1047      for (Action mutation : mutations) {
1048        // The non-null mArray[i] means the cell scanner has been read.
1049        if (mArray[processedMutationIndex++] == null) {
1050          skipCellsForMutation(mutation, cells);
1051        }
1052      }
1053      updateMutationMetrics(region, before, batchContainsPuts, batchContainsDelete);
1054    }
1055  }
1056
1057  private void updateMutationMetrics(HRegion region, long starttime, boolean batchContainsPuts,
1058    boolean batchContainsDelete) {
1059    final MetricsRegionServer metricsRegionServer = server.getMetrics();
1060    if (metricsRegionServer != null) {
1061      long after = EnvironmentEdgeManager.currentTime();
1062      if (batchContainsPuts) {
1063        metricsRegionServer.updatePutBatch(region, after - starttime);
1064      }
1065      if (batchContainsDelete) {
1066        metricsRegionServer.updateDeleteBatch(region, after - starttime);
1067      }
1068    }
1069  }
1070
1071  /**
1072   * Execute a list of Put/Delete mutations. The function returns OperationStatus instead of
1073   * constructing MultiResponse to save a possible loop if caller doesn't need MultiResponse.
1074   * @return an array of OperationStatus which internally contains the OperationStatusCode and the
1075   *         exceptionMessage if any
1076   * @deprecated Since 3.0.0, will be removed in 4.0.0. We do not use this method for replaying
1077   *             edits for secondary replicas any more, see
1078   *             {@link #replicateToReplica(RpcController, ReplicateWALEntryRequest)}.
1079   */
1080  @Deprecated
1081  private OperationStatus[] doReplayBatchOp(final HRegion region,
1082    final List<MutationReplay> mutations, long replaySeqId) throws IOException {
1083    long before = EnvironmentEdgeManager.currentTime();
1084    boolean batchContainsPuts = false, batchContainsDelete = false;
1085    try {
1086      for (Iterator<MutationReplay> it = mutations.iterator(); it.hasNext();) {
1087        MutationReplay m = it.next();
1088
1089        if (m.getType() == MutationType.PUT) {
1090          batchContainsPuts = true;
1091        } else {
1092          batchContainsDelete = true;
1093        }
1094
1095        NavigableMap<byte[], List<Cell>> map = m.mutation.getFamilyCellMap();
1096        List<Cell> metaCells = map.get(WALEdit.METAFAMILY);
1097        if (metaCells != null && !metaCells.isEmpty()) {
1098          for (Cell metaCell : metaCells) {
1099            CompactionDescriptor compactionDesc = WALEdit.getCompaction(metaCell);
1100            boolean isDefaultReplica = RegionReplicaUtil.isDefaultReplica(region.getRegionInfo());
1101            HRegion hRegion = region;
1102            if (compactionDesc != null) {
1103              // replay the compaction. Remove the files from stores only if we are the primary
1104              // region replica (thus own the files)
1105              hRegion.replayWALCompactionMarker(compactionDesc, !isDefaultReplica, isDefaultReplica,
1106                replaySeqId);
1107              continue;
1108            }
1109            FlushDescriptor flushDesc = WALEdit.getFlushDescriptor(metaCell);
1110            if (flushDesc != null && !isDefaultReplica) {
1111              hRegion.replayWALFlushMarker(flushDesc, replaySeqId);
1112              continue;
1113            }
1114            RegionEventDescriptor regionEvent = WALEdit.getRegionEventDescriptor(metaCell);
1115            if (regionEvent != null && !isDefaultReplica) {
1116              hRegion.replayWALRegionEventMarker(regionEvent);
1117              continue;
1118            }
1119            BulkLoadDescriptor bulkLoadEvent = WALEdit.getBulkLoadDescriptor(metaCell);
1120            if (bulkLoadEvent != null) {
1121              hRegion.replayWALBulkLoadEventMarker(bulkLoadEvent);
1122              continue;
1123            }
1124          }
1125          it.remove();
1126        }
1127      }
1128      requestCount.increment();
1129      if (!region.getRegionInfo().isMetaRegion()) {
1130        server.getMemStoreFlusher().reclaimMemStoreMemory();
1131      }
1132      return region.batchReplay(mutations.toArray(new MutationReplay[mutations.size()]),
1133        replaySeqId);
1134    } finally {
1135      updateMutationMetrics(region, before, batchContainsPuts, batchContainsDelete);
1136    }
1137  }
1138
1139  private void closeAllScanners() {
1140    // Close any outstanding scanners. Means they'll get an UnknownScanner
1141    // exception next time they come in.
1142    for (Map.Entry<String, RegionScannerHolder> e : scanners.entrySet()) {
1143      try {
1144        e.getValue().s.close();
1145      } catch (IOException ioe) {
1146        LOG.warn("Closing scanner " + e.getKey(), ioe);
1147      }
1148    }
1149  }
1150
1151  // Directly invoked only for testing
1152  public RSRpcServices(final HRegionServer rs) throws IOException {
1153    super(rs, rs.getProcessName());
1154    final Configuration conf = rs.getConfiguration();
1155    rowSizeWarnThreshold =
1156      conf.getInt(HConstants.BATCH_ROWS_THRESHOLD_NAME, HConstants.BATCH_ROWS_THRESHOLD_DEFAULT);
1157    rejectRowsWithSizeOverThreshold =
1158      conf.getBoolean(REJECT_BATCH_ROWS_OVER_THRESHOLD, DEFAULT_REJECT_BATCH_ROWS_OVER_THRESHOLD);
1159    scannerLeaseTimeoutPeriod = conf.getInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD,
1160      HConstants.DEFAULT_HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD);
1161    maxScannerResultSize = conf.getLong(HConstants.HBASE_SERVER_SCANNER_MAX_RESULT_SIZE_KEY,
1162      HConstants.DEFAULT_HBASE_SERVER_SCANNER_MAX_RESULT_SIZE);
1163    rpcTimeout =
1164      conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY, HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
1165    minimumScanTimeLimitDelta = conf.getLong(REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA,
1166      DEFAULT_REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA);
1167    rpcServer.setNamedQueueRecorder(rs.getNamedQueueRecorder());
1168    closedScanners = CacheBuilder.newBuilder()
1169      .expireAfterAccess(scannerLeaseTimeoutPeriod, TimeUnit.MILLISECONDS).build();
1170  }
1171
1172  @Override
1173  protected boolean defaultReservoirEnabled() {
1174    return true;
1175  }
1176
1177  @Override
1178  protected ServerType getDNSServerType() {
1179    return DNS.ServerType.REGIONSERVER;
1180  }
1181
1182  @Override
1183  protected String getHostname(Configuration conf, String defaultHostname) {
1184    return conf.get("hbase.regionserver.ipc.address", defaultHostname);
1185  }
1186
1187  @Override
1188  protected String getPortConfigName() {
1189    return HConstants.REGIONSERVER_PORT;
1190  }
1191
1192  @Override
1193  protected int getDefaultPort() {
1194    return HConstants.DEFAULT_REGIONSERVER_PORT;
1195  }
1196
1197  @Override
1198  protected Class<?> getRpcSchedulerFactoryClass(Configuration conf) {
1199    return conf.getClass(REGION_SERVER_RPC_SCHEDULER_FACTORY_CLASS,
1200      SimpleRpcSchedulerFactory.class);
1201  }
1202
1203  protected RpcServerInterface createRpcServer(final Server server,
1204    final RpcSchedulerFactory rpcSchedulerFactory, final InetSocketAddress bindAddress,
1205    final String name) throws IOException {
1206    final Configuration conf = server.getConfiguration();
1207    boolean reservoirEnabled = conf.getBoolean(ByteBuffAllocator.ALLOCATOR_POOL_ENABLED_KEY, true);
1208    try {
1209      return RpcServerFactory.createRpcServer(server, name, getServices(), bindAddress, // use final
1210                                                                                        // bindAddress
1211                                                                                        // for this
1212                                                                                        // server.
1213        conf, rpcSchedulerFactory.create(conf, this, server), reservoirEnabled);
1214    } catch (BindException be) {
1215      throw new IOException(be.getMessage() + ". To switch ports use the '"
1216        + HConstants.REGIONSERVER_PORT + "' configuration property.",
1217        be.getCause() != null ? be.getCause() : be);
1218    }
1219  }
1220
1221  protected Class<?> getRpcSchedulerFactoryClass() {
1222    final Configuration conf = server.getConfiguration();
1223    return conf.getClass(REGION_SERVER_RPC_SCHEDULER_FACTORY_CLASS,
1224      SimpleRpcSchedulerFactory.class);
1225  }
1226
1227  protected PriorityFunction createPriority() {
1228    return new RSAnnotationReadingPriorityFunction(this);
1229  }
1230
1231  public int getScannersCount() {
1232    return scanners.size();
1233  }
1234
1235  /** Returns The outstanding RegionScanner for <code>scannerId</code> or null if none found. */
1236  RegionScanner getScanner(long scannerId) {
1237    RegionScannerHolder rsh = getRegionScannerHolder(scannerId);
1238    return rsh == null ? null : rsh.s;
1239  }
1240
1241  /** Returns The associated RegionScannerHolder for <code>scannerId</code> or null. */
1242  private RegionScannerHolder getRegionScannerHolder(long scannerId) {
1243    return scanners.get(toScannerName(scannerId));
1244  }
1245
1246  public String getScanDetailsWithId(long scannerId) {
1247    RegionScanner scanner = getScanner(scannerId);
1248    if (scanner == null) {
1249      return null;
1250    }
1251    StringBuilder builder = new StringBuilder();
1252    builder.append("table: ").append(scanner.getRegionInfo().getTable().getNameAsString());
1253    builder.append(" region: ").append(scanner.getRegionInfo().getRegionNameAsString());
1254    builder.append(" operation_id: ").append(scanner.getOperationId());
1255    return builder.toString();
1256  }
1257
1258  public String getScanDetailsWithRequest(ScanRequest request) {
1259    try {
1260      if (!request.hasRegion()) {
1261        return null;
1262      }
1263      Region region = getRegion(request.getRegion());
1264      StringBuilder builder = new StringBuilder();
1265      builder.append("table: ").append(region.getRegionInfo().getTable().getNameAsString());
1266      builder.append(" region: ").append(region.getRegionInfo().getRegionNameAsString());
1267      for (NameBytesPair pair : request.getScan().getAttributeList()) {
1268        if (OperationWithAttributes.ID_ATRIBUTE.equals(pair.getName())) {
1269          builder.append(" operation_id: ").append(Bytes.toString(pair.getValue().toByteArray()));
1270          break;
1271        }
1272      }
1273      return builder.toString();
1274    } catch (IOException ignored) {
1275      return null;
1276    }
1277  }
1278
1279  /**
1280   * Get the vtime associated with the scanner. Currently the vtime is the number of "next" calls.
1281   */
1282  long getScannerVirtualTime(long scannerId) {
1283    RegionScannerHolder rsh = getRegionScannerHolder(scannerId);
1284    return rsh == null ? 0L : rsh.getNextCallSeq();
1285  }
1286
1287  /**
1288   * Method to account for the size of retained cells.
1289   * @param context rpc call context
1290   * @param r       result to add size.
1291   * @return an object that represents the last referenced block from this response.
1292   */
1293  void addSize(RpcCallContext context, Result r) {
1294    if (context != null && r != null && !r.isEmpty()) {
1295      for (Cell c : r.rawCells()) {
1296        context.incrementResponseCellSize(PrivateCellUtil.estimatedSerializedSizeOf(c));
1297      }
1298    }
1299  }
1300
1301  /** Returns Remote client's ip and port else null if can't be determined. */
1302  @RestrictedApi(explanation = "Should only be called in TestRSRpcServices and RSRpcServices",
1303      link = "", allowedOnPath = ".*(TestRSRpcServices|RSRpcServices).java")
1304  static String getRemoteClientIpAndPort() {
1305    RpcCall rpcCall = RpcServer.getCurrentCall().orElse(null);
1306    if (rpcCall == null) {
1307      return HConstants.EMPTY_STRING;
1308    }
1309    InetAddress address = rpcCall.getRemoteAddress();
1310    if (address == null) {
1311      return HConstants.EMPTY_STRING;
1312    }
1313    // Be careful here with InetAddress. Do InetAddress#getHostAddress. It will not do a name
1314    // resolution. Just use the IP. It is generally a smaller amount of info to keep around while
1315    // scanning than a hostname anyways.
1316    return Address.fromParts(address.getHostAddress(), rpcCall.getRemotePort()).toString();
1317  }
1318
1319  /** Returns Remote client's username. */
1320  @RestrictedApi(explanation = "Should only be called in TestRSRpcServices and RSRpcServices",
1321      link = "", allowedOnPath = ".*(TestRSRpcServices|RSRpcServices).java")
1322  static String getUserName() {
1323    RpcCall rpcCall = RpcServer.getCurrentCall().orElse(null);
1324    if (rpcCall == null) {
1325      return HConstants.EMPTY_STRING;
1326    }
1327    return rpcCall.getRequestUserName().orElse(HConstants.EMPTY_STRING);
1328  }
1329
1330  private RegionScannerHolder addScanner(String scannerName, RegionScanner s, Shipper shipper,
1331    HRegion r, boolean needCursor, boolean fullRegionScan) throws LeaseStillHeldException {
1332    Lease lease = server.getLeaseManager().createLease(scannerName, this.scannerLeaseTimeoutPeriod,
1333      new ScannerListener(scannerName));
1334    RpcCallback shippedCallback = new RegionScannerShippedCallBack(scannerName, shipper, lease);
1335    RpcCallback closeCallback =
1336      s instanceof RpcCallback ? (RpcCallback) s : new RegionScannerCloseCallBack(s);
1337    RegionScannerHolder rsh = new RegionScannerHolder(s, r, closeCallback, shippedCallback,
1338      needCursor, fullRegionScan, getRemoteClientIpAndPort(), getUserName());
1339    RegionScannerHolder existing = scanners.putIfAbsent(scannerName, rsh);
1340    assert existing == null : "scannerId must be unique within regionserver's whole lifecycle! "
1341      + scannerName + ", " + existing;
1342    return rsh;
1343  }
1344
1345  private boolean isFullRegionScan(Scan scan, HRegion region) {
1346    // If the scan start row equals or less than the start key of the region
1347    // and stop row greater than equals end key (if stop row present)
1348    // or if the stop row is empty
1349    // account this as a full region scan
1350    if (
1351      Bytes.compareTo(scan.getStartRow(), region.getRegionInfo().getStartKey()) <= 0
1352        && (Bytes.compareTo(scan.getStopRow(), region.getRegionInfo().getEndKey()) >= 0
1353          && !Bytes.equals(region.getRegionInfo().getEndKey(), HConstants.EMPTY_END_ROW)
1354          || Bytes.equals(scan.getStopRow(), HConstants.EMPTY_END_ROW))
1355    ) {
1356      return true;
1357    }
1358    return false;
1359  }
1360
1361  /**
1362   * Find the HRegion based on a region specifier
1363   * @param regionSpecifier the region specifier
1364   * @return the corresponding region
1365   * @throws IOException if the specifier is not null, but failed to find the region
1366   */
1367  public HRegion getRegion(final RegionSpecifier regionSpecifier) throws IOException {
1368    return server.getRegion(regionSpecifier.getValue().toByteArray());
1369  }
1370
1371  /**
1372   * Find the List of HRegions based on a list of region specifiers
1373   * @param regionSpecifiers the list of region specifiers
1374   * @return the corresponding list of regions
1375   * @throws IOException if any of the specifiers is not null, but failed to find the region
1376   */
1377  private List<HRegion> getRegions(final List<RegionSpecifier> regionSpecifiers,
1378    final CacheEvictionStatsBuilder stats) {
1379    List<HRegion> regions = Lists.newArrayListWithCapacity(regionSpecifiers.size());
1380    for (RegionSpecifier regionSpecifier : regionSpecifiers) {
1381      try {
1382        regions.add(server.getRegion(regionSpecifier.getValue().toByteArray()));
1383      } catch (NotServingRegionException e) {
1384        stats.addException(regionSpecifier.getValue().toByteArray(), e);
1385      }
1386    }
1387    return regions;
1388  }
1389
1390  PriorityFunction getPriority() {
1391    return priority;
1392  }
1393
1394  private RegionServerRpcQuotaManager getRpcQuotaManager() {
1395    return server.getRegionServerRpcQuotaManager();
1396  }
1397
1398  private RegionServerSpaceQuotaManager getSpaceQuotaManager() {
1399    return server.getRegionServerSpaceQuotaManager();
1400  }
1401
1402  void start(ZKWatcher zkWatcher) {
1403    this.scannerIdGenerator = new ScannerIdGenerator(this.server.getServerName());
1404    internalStart(zkWatcher);
1405  }
1406
1407  void stop() {
1408    closeAllScanners();
1409    internalStop();
1410  }
1411
1412  /**
1413   * Called to verify that this server is up and running.
1414   */
1415  // TODO : Rename this and HMaster#checkInitialized to isRunning() (or a better name).
1416  protected void checkOpen() throws IOException {
1417    if (server.isAborted()) {
1418      throw new RegionServerAbortedException("Server " + server.getServerName() + " aborting");
1419    }
1420    if (server.isStopped()) {
1421      throw new RegionServerStoppedException("Server " + server.getServerName() + " stopping");
1422    }
1423    if (!server.isDataFileSystemOk()) {
1424      throw new RegionServerStoppedException("File system not available");
1425    }
1426    if (!server.isOnline()) {
1427      throw new ServerNotRunningYetException(
1428        "Server " + server.getServerName() + " is not running yet");
1429    }
1430  }
1431
1432  /**
1433   * By default, put up an Admin and a Client Service. Set booleans
1434   * <code>hbase.regionserver.admin.executorService</code> and
1435   * <code>hbase.regionserver.client.executorService</code> if you want to enable/disable services.
1436   * Default is that both are enabled.
1437   * @return immutable list of blocking services and the security info classes that this server
1438   *         supports
1439   */
1440  protected List<BlockingServiceAndInterface> getServices() {
1441    boolean admin = getConfiguration().getBoolean(REGIONSERVER_ADMIN_SERVICE_CONFIG, true);
1442    boolean client = getConfiguration().getBoolean(REGIONSERVER_CLIENT_SERVICE_CONFIG, true);
1443    boolean clientMeta =
1444      getConfiguration().getBoolean(REGIONSERVER_CLIENT_META_SERVICE_CONFIG, true);
1445    List<BlockingServiceAndInterface> bssi = new ArrayList<>();
1446    if (client) {
1447      bssi.add(new BlockingServiceAndInterface(ClientService.newReflectiveBlockingService(this),
1448        ClientService.BlockingInterface.class));
1449    }
1450    if (admin) {
1451      bssi.add(new BlockingServiceAndInterface(AdminService.newReflectiveBlockingService(this),
1452        AdminService.BlockingInterface.class));
1453    }
1454    if (clientMeta) {
1455      bssi.add(new BlockingServiceAndInterface(ClientMetaService.newReflectiveBlockingService(this),
1456        ClientMetaService.BlockingInterface.class));
1457    }
1458    return new ImmutableList.Builder<BlockingServiceAndInterface>().addAll(bssi).build();
1459  }
1460
1461  /**
1462   * Close a region on the region server.
1463   * @param controller the RPC controller
1464   * @param request    the request
1465   */
1466  @Override
1467  @QosPriority(priority = HConstants.ADMIN_QOS)
1468  public CloseRegionResponse closeRegion(final RpcController controller,
1469    final CloseRegionRequest request) throws ServiceException {
1470    final ServerName sn = (request.hasDestinationServer()
1471      ? ProtobufUtil.toServerName(request.getDestinationServer())
1472      : null);
1473
1474    try {
1475      checkOpen();
1476      throwOnWrongStartCode(request);
1477      final String encodedRegionName = ProtobufUtil.getRegionEncodedName(request.getRegion());
1478
1479      requestCount.increment();
1480      if (sn == null) {
1481        LOG.info("Close " + encodedRegionName + " without moving");
1482      } else {
1483        LOG.info("Close " + encodedRegionName + ", moving to " + sn);
1484      }
1485      boolean closed = server.closeRegion(encodedRegionName, false, sn);
1486      CloseRegionResponse.Builder builder = CloseRegionResponse.newBuilder().setClosed(closed);
1487      return builder.build();
1488    } catch (IOException ie) {
1489      throw new ServiceException(ie);
1490    }
1491  }
1492
1493  /**
1494   * Compact a region on the region server.
1495   * @param controller the RPC controller
1496   * @param request    the request
1497   */
1498  @Override
1499  @QosPriority(priority = HConstants.ADMIN_QOS)
1500  public CompactRegionResponse compactRegion(final RpcController controller,
1501    final CompactRegionRequest request) throws ServiceException {
1502    try {
1503      checkOpen();
1504      requestCount.increment();
1505      HRegion region = getRegion(request.getRegion());
1506      // Quota support is enabled, the requesting user is not system/super user
1507      // and a quota policy is enforced that disables compactions.
1508      if (
1509        QuotaUtil.isQuotaEnabled(getConfiguration())
1510          && !Superusers.isSuperUser(RpcServer.getRequestUser().orElse(null))
1511          && this.server.getRegionServerSpaceQuotaManager()
1512            .areCompactionsDisabled(region.getTableDescriptor().getTableName())
1513      ) {
1514        throw new DoNotRetryIOException(
1515          "Compactions on this region are " + "disabled due to a space quota violation.");
1516      }
1517      region.startRegionOperation(Operation.COMPACT_REGION);
1518      LOG.info("Compacting " + region.getRegionInfo().getRegionNameAsString());
1519      boolean major = request.hasMajor() && request.getMajor();
1520      if (request.hasFamily()) {
1521        byte[] family = request.getFamily().toByteArray();
1522        String log = "User-triggered " + (major ? "major " : "") + "compaction for region "
1523          + region.getRegionInfo().getRegionNameAsString() + " and family "
1524          + Bytes.toString(family);
1525        LOG.trace(log);
1526        region.requestCompaction(family, log, Store.PRIORITY_USER, major,
1527          CompactionLifeCycleTracker.DUMMY);
1528      } else {
1529        String log = "User-triggered " + (major ? "major " : "") + "compaction for region "
1530          + region.getRegionInfo().getRegionNameAsString();
1531        LOG.trace(log);
1532        region.requestCompaction(log, Store.PRIORITY_USER, major, CompactionLifeCycleTracker.DUMMY);
1533      }
1534      return CompactRegionResponse.newBuilder().build();
1535    } catch (IOException ie) {
1536      throw new ServiceException(ie);
1537    }
1538  }
1539
1540  @Override
1541  public CompactionSwitchResponse compactionSwitch(RpcController controller,
1542    CompactionSwitchRequest request) throws ServiceException {
1543    rpcPreCheck("compactionSwitch");
1544    final CompactSplit compactSplitThread = server.getCompactSplitThread();
1545    requestCount.increment();
1546    boolean prevState = compactSplitThread.isCompactionsEnabled();
1547    CompactionSwitchResponse response =
1548      CompactionSwitchResponse.newBuilder().setPrevState(prevState).build();
1549    if (prevState == request.getEnabled()) {
1550      // passed in requested state is same as current state. No action required
1551      return response;
1552    }
1553    compactSplitThread.switchCompaction(request.getEnabled());
1554    return response;
1555  }
1556
1557  /**
1558   * Flush a region on the region server.
1559   * @param controller the RPC controller
1560   * @param request    the request
1561   */
1562  @Override
1563  @QosPriority(priority = HConstants.ADMIN_QOS)
1564  public FlushRegionResponse flushRegion(final RpcController controller,
1565    final FlushRegionRequest request) throws ServiceException {
1566    try {
1567      checkOpen();
1568      requestCount.increment();
1569      HRegion region = getRegion(request.getRegion());
1570      LOG.info("Flushing " + region.getRegionInfo().getRegionNameAsString());
1571      boolean shouldFlush = true;
1572      if (request.hasIfOlderThanTs()) {
1573        shouldFlush = region.getEarliestFlushTimeForAllStores() < request.getIfOlderThanTs();
1574      }
1575      FlushRegionResponse.Builder builder = FlushRegionResponse.newBuilder();
1576      if (shouldFlush) {
1577        boolean writeFlushWalMarker =
1578          request.hasWriteFlushWalMarker() ? request.getWriteFlushWalMarker() : false;
1579        // Go behind the curtain so we can manage writing of the flush WAL marker
1580        HRegion.FlushResultImpl flushResult = null;
1581        if (request.hasFamily()) {
1582          List families = new ArrayList();
1583          families.add(request.getFamily().toByteArray());
1584          flushResult =
1585            region.flushcache(families, writeFlushWalMarker, FlushLifeCycleTracker.DUMMY);
1586        } else {
1587          flushResult = region.flushcache(true, writeFlushWalMarker, FlushLifeCycleTracker.DUMMY);
1588        }
1589        boolean compactionNeeded = flushResult.isCompactionNeeded();
1590        if (compactionNeeded) {
1591          server.getCompactSplitThread().requestSystemCompaction(region,
1592            "Compaction through user triggered flush");
1593        }
1594        builder.setFlushed(flushResult.isFlushSucceeded());
1595        builder.setWroteFlushWalMarker(flushResult.wroteFlushWalMarker);
1596      }
1597      builder.setLastFlushTime(region.getEarliestFlushTimeForAllStores());
1598      return builder.build();
1599    } catch (DroppedSnapshotException ex) {
1600      // Cache flush can fail in a few places. If it fails in a critical
1601      // section, we get a DroppedSnapshotException and a replay of wal
1602      // is required. Currently the only way to do this is a restart of
1603      // the server.
1604      server.abort("Replay of WAL required. Forcing server shutdown", ex);
1605      throw new ServiceException(ex);
1606    } catch (IOException ie) {
1607      throw new ServiceException(ie);
1608    }
1609  }
1610
1611  @Override
1612  @QosPriority(priority = HConstants.ADMIN_QOS)
1613  public GetOnlineRegionResponse getOnlineRegion(final RpcController controller,
1614    final GetOnlineRegionRequest request) throws ServiceException {
1615    try {
1616      checkOpen();
1617      requestCount.increment();
1618      Map<String, HRegion> onlineRegions = server.getOnlineRegions();
1619      List<RegionInfo> list = new ArrayList<>(onlineRegions.size());
1620      for (HRegion region : onlineRegions.values()) {
1621        list.add(region.getRegionInfo());
1622      }
1623      list.sort(RegionInfo.COMPARATOR);
1624      return ResponseConverter.buildGetOnlineRegionResponse(list);
1625    } catch (IOException ie) {
1626      throw new ServiceException(ie);
1627    }
1628  }
1629
1630  // Master implementation of this Admin Service differs given it is not
1631  // able to supply detail only known to RegionServer. See note on
1632  // MasterRpcServers#getRegionInfo.
1633  @Override
1634  @QosPriority(priority = HConstants.ADMIN_QOS)
1635  public GetRegionInfoResponse getRegionInfo(final RpcController controller,
1636    final GetRegionInfoRequest request) throws ServiceException {
1637    try {
1638      checkOpen();
1639      requestCount.increment();
1640      HRegion region = getRegion(request.getRegion());
1641      RegionInfo info = region.getRegionInfo();
1642      byte[] bestSplitRow;
1643      if (request.hasBestSplitRow() && request.getBestSplitRow()) {
1644        bestSplitRow = region.checkSplit(true).orElse(null);
1645        // when all table data are in memstore, bestSplitRow = null
1646        // try to flush region first
1647        if (bestSplitRow == null) {
1648          region.flush(true);
1649          bestSplitRow = region.checkSplit(true).orElse(null);
1650        }
1651      } else {
1652        bestSplitRow = null;
1653      }
1654      GetRegionInfoResponse.Builder builder = GetRegionInfoResponse.newBuilder();
1655      builder.setRegionInfo(ProtobufUtil.toRegionInfo(info));
1656      if (request.hasCompactionState() && request.getCompactionState()) {
1657        builder.setCompactionState(ProtobufUtil.createCompactionState(region.getCompactionState()));
1658      }
1659      builder.setSplittable(region.isSplittable());
1660      builder.setMergeable(region.isMergeable());
1661      if (request.hasBestSplitRow() && request.getBestSplitRow() && bestSplitRow != null) {
1662        builder.setBestSplitRow(UnsafeByteOperations.unsafeWrap(bestSplitRow));
1663      }
1664      return builder.build();
1665    } catch (IOException ie) {
1666      throw new ServiceException(ie);
1667    }
1668  }
1669
1670  @Override
1671  @QosPriority(priority = HConstants.ADMIN_QOS)
1672  public GetRegionLoadResponse getRegionLoad(RpcController controller, GetRegionLoadRequest request)
1673    throws ServiceException {
1674
1675    List<HRegion> regions;
1676    if (request.hasTableName()) {
1677      TableName tableName = ProtobufUtil.toTableName(request.getTableName());
1678      regions = server.getRegions(tableName);
1679    } else {
1680      regions = server.getRegions();
1681    }
1682    List<RegionLoad> rLoads = new ArrayList<>(regions.size());
1683    RegionLoad.Builder regionLoadBuilder = ClusterStatusProtos.RegionLoad.newBuilder();
1684    RegionSpecifier.Builder regionSpecifier = RegionSpecifier.newBuilder();
1685
1686    try {
1687      for (HRegion region : regions) {
1688        rLoads.add(server.createRegionLoad(region, regionLoadBuilder, regionSpecifier));
1689      }
1690    } catch (IOException e) {
1691      throw new ServiceException(e);
1692    }
1693    GetRegionLoadResponse.Builder builder = GetRegionLoadResponse.newBuilder();
1694    builder.addAllRegionLoads(rLoads);
1695    return builder.build();
1696  }
1697
1698  @Override
1699  @QosPriority(priority = HConstants.ADMIN_QOS)
1700  public ClearCompactionQueuesResponse clearCompactionQueues(RpcController controller,
1701    ClearCompactionQueuesRequest request) throws ServiceException {
1702    LOG.debug("Client=" + RpcServer.getRequestUserName().orElse(null) + "/"
1703      + RpcServer.getRemoteAddress().orElse(null) + " clear compactions queue");
1704    ClearCompactionQueuesResponse.Builder respBuilder = ClearCompactionQueuesResponse.newBuilder();
1705    requestCount.increment();
1706    if (clearCompactionQueues.compareAndSet(false, true)) {
1707      final CompactSplit compactSplitThread = server.getCompactSplitThread();
1708      try {
1709        checkOpen();
1710        server.getRegionServerCoprocessorHost().preClearCompactionQueues();
1711        for (String queueName : request.getQueueNameList()) {
1712          LOG.debug("clear " + queueName + " compaction queue");
1713          switch (queueName) {
1714            case "long":
1715              compactSplitThread.clearLongCompactionsQueue();
1716              break;
1717            case "short":
1718              compactSplitThread.clearShortCompactionsQueue();
1719              break;
1720            default:
1721              LOG.warn("Unknown queue name " + queueName);
1722              throw new IOException("Unknown queue name " + queueName);
1723          }
1724        }
1725        server.getRegionServerCoprocessorHost().postClearCompactionQueues();
1726      } catch (IOException ie) {
1727        throw new ServiceException(ie);
1728      } finally {
1729        clearCompactionQueues.set(false);
1730      }
1731    } else {
1732      LOG.warn("Clear compactions queue is executing by other admin.");
1733    }
1734    return respBuilder.build();
1735  }
1736
1737  /**
1738   * Get some information of the region server.
1739   * @param controller the RPC controller
1740   * @param request    the request
1741   */
1742  @Override
1743  @QosPriority(priority = HConstants.ADMIN_QOS)
1744  public GetServerInfoResponse getServerInfo(final RpcController controller,
1745    final GetServerInfoRequest request) throws ServiceException {
1746    try {
1747      checkOpen();
1748    } catch (IOException ie) {
1749      throw new ServiceException(ie);
1750    }
1751    requestCount.increment();
1752    int infoPort = server.getInfoServer() != null ? server.getInfoServer().getPort() : -1;
1753    return ResponseConverter.buildGetServerInfoResponse(server.getServerName(), infoPort);
1754  }
1755
1756  @Override
1757  @QosPriority(priority = HConstants.ADMIN_QOS)
1758  public GetStoreFileResponse getStoreFile(final RpcController controller,
1759    final GetStoreFileRequest request) throws ServiceException {
1760    try {
1761      checkOpen();
1762      HRegion region = getRegion(request.getRegion());
1763      requestCount.increment();
1764      Set<byte[]> columnFamilies;
1765      if (request.getFamilyCount() == 0) {
1766        columnFamilies = region.getTableDescriptor().getColumnFamilyNames();
1767      } else {
1768        columnFamilies = new TreeSet<>(Bytes.BYTES_RAWCOMPARATOR);
1769        for (ByteString cf : request.getFamilyList()) {
1770          columnFamilies.add(cf.toByteArray());
1771        }
1772      }
1773      int nCF = columnFamilies.size();
1774      List<String> fileList = region.getStoreFileList(columnFamilies.toArray(new byte[nCF][]));
1775      GetStoreFileResponse.Builder builder = GetStoreFileResponse.newBuilder();
1776      builder.addAllStoreFile(fileList);
1777      return builder.build();
1778    } catch (IOException ie) {
1779      throw new ServiceException(ie);
1780    }
1781  }
1782
1783  private void throwOnWrongStartCode(OpenRegionRequest request) throws ServiceException {
1784    if (!request.hasServerStartCode()) {
1785      LOG.warn("OpenRegionRequest for {} does not have a start code", request.getOpenInfoList());
1786      return;
1787    }
1788    throwOnWrongStartCode(request.getServerStartCode());
1789  }
1790
1791  private void throwOnWrongStartCode(CloseRegionRequest request) throws ServiceException {
1792    if (!request.hasServerStartCode()) {
1793      LOG.warn("CloseRegionRequest for {} does not have a start code", request.getRegion());
1794      return;
1795    }
1796    throwOnWrongStartCode(request.getServerStartCode());
1797  }
1798
1799  private void throwOnWrongStartCode(long serverStartCode) throws ServiceException {
1800    // check that we are the same server that this RPC is intended for.
1801    if (server.getServerName().getStartcode() != serverStartCode) {
1802      throw new ServiceException(new DoNotRetryIOException(
1803        "This RPC was intended for a " + "different server with startCode: " + serverStartCode
1804          + ", this server is: " + server.getServerName()));
1805    }
1806  }
1807
1808  private void throwOnWrongStartCode(ExecuteProceduresRequest req) throws ServiceException {
1809    if (req.getOpenRegionCount() > 0) {
1810      for (OpenRegionRequest openReq : req.getOpenRegionList()) {
1811        throwOnWrongStartCode(openReq);
1812      }
1813    }
1814    if (req.getCloseRegionCount() > 0) {
1815      for (CloseRegionRequest closeReq : req.getCloseRegionList()) {
1816        throwOnWrongStartCode(closeReq);
1817      }
1818    }
1819  }
1820
1821  /**
1822   * Open asynchronously a region or a set of regions on the region server. The opening is
1823   * coordinated by ZooKeeper, and this method requires the znode to be created before being called.
1824   * As a consequence, this method should be called only from the master.
1825   * <p>
1826   * Different manages states for the region are:
1827   * </p>
1828   * <ul>
1829   * <li>region not opened: the region opening will start asynchronously.</li>
1830   * <li>a close is already in progress: this is considered as an error.</li>
1831   * <li>an open is already in progress: this new open request will be ignored. This is important
1832   * because the Master can do multiple requests if it crashes.</li>
1833   * <li>the region is already opened: this new open request will be ignored.</li>
1834   * </ul>
1835   * <p>
1836   * Bulk assign: If there are more than 1 region to open, it will be considered as a bulk assign.
1837   * For a single region opening, errors are sent through a ServiceException. For bulk assign,
1838   * errors are put in the response as FAILED_OPENING.
1839   * </p>
1840   * @param controller the RPC controller
1841   * @param request    the request
1842   */
1843  @Override
1844  @QosPriority(priority = HConstants.ADMIN_QOS)
1845  public OpenRegionResponse openRegion(final RpcController controller,
1846    final OpenRegionRequest request) throws ServiceException {
1847    requestCount.increment();
1848    throwOnWrongStartCode(request);
1849
1850    OpenRegionResponse.Builder builder = OpenRegionResponse.newBuilder();
1851    final int regionCount = request.getOpenInfoCount();
1852    final Map<TableName, TableDescriptor> htds = new HashMap<>(regionCount);
1853    final boolean isBulkAssign = regionCount > 1;
1854    try {
1855      checkOpen();
1856    } catch (IOException ie) {
1857      TableName tableName = null;
1858      if (regionCount == 1) {
1859        org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionInfo ri =
1860          request.getOpenInfo(0).getRegion();
1861        if (ri != null) {
1862          tableName = ProtobufUtil.toTableName(ri.getTableName());
1863        }
1864      }
1865      if (!TableName.META_TABLE_NAME.equals(tableName)) {
1866        throw new ServiceException(ie);
1867      }
1868      // We are assigning meta, wait a little for regionserver to finish initialization.
1869      // Default to quarter of RPC timeout
1870      int timeout = server.getConfiguration().getInt(HConstants.HBASE_RPC_TIMEOUT_KEY,
1871        HConstants.DEFAULT_HBASE_RPC_TIMEOUT) >> 2;
1872      long endTime = EnvironmentEdgeManager.currentTime() + timeout;
1873      synchronized (server.online) {
1874        try {
1875          while (
1876            EnvironmentEdgeManager.currentTime() <= endTime && !server.isStopped()
1877              && !server.isOnline()
1878          ) {
1879            server.online.wait(server.getMsgInterval());
1880          }
1881          checkOpen();
1882        } catch (InterruptedException t) {
1883          Thread.currentThread().interrupt();
1884          throw new ServiceException(t);
1885        } catch (IOException e) {
1886          throw new ServiceException(e);
1887        }
1888      }
1889    }
1890
1891    long masterSystemTime = request.hasMasterSystemTime() ? request.getMasterSystemTime() : -1;
1892
1893    for (RegionOpenInfo regionOpenInfo : request.getOpenInfoList()) {
1894      final RegionInfo region = ProtobufUtil.toRegionInfo(regionOpenInfo.getRegion());
1895      TableDescriptor htd;
1896      try {
1897        String encodedName = region.getEncodedName();
1898        byte[] encodedNameBytes = region.getEncodedNameAsBytes();
1899        final HRegion onlineRegion = server.getRegion(encodedName);
1900        if (onlineRegion != null) {
1901          // The region is already online. This should not happen any more.
1902          String error = "Received OPEN for the region:" + region.getRegionNameAsString()
1903            + ", which is already online";
1904          LOG.warn(error);
1905          // server.abort(error);
1906          // throw new IOException(error);
1907          builder.addOpeningState(RegionOpeningState.OPENED);
1908          continue;
1909        }
1910        LOG.info("Open " + region.getRegionNameAsString());
1911
1912        final Boolean previous =
1913          server.getRegionsInTransitionInRS().putIfAbsent(encodedNameBytes, Boolean.TRUE);
1914
1915        if (Boolean.FALSE.equals(previous)) {
1916          if (server.getRegion(encodedName) != null) {
1917            // There is a close in progress. This should not happen any more.
1918            String error = "Received OPEN for the region:" + region.getRegionNameAsString()
1919              + ", which we are already trying to CLOSE";
1920            server.abort(error);
1921            throw new IOException(error);
1922          }
1923          server.getRegionsInTransitionInRS().put(encodedNameBytes, Boolean.TRUE);
1924        }
1925
1926        if (Boolean.TRUE.equals(previous)) {
1927          // An open is in progress. This is supported, but let's log this.
1928          LOG.info("Receiving OPEN for the region:" + region.getRegionNameAsString()
1929            + ", which we are already trying to OPEN"
1930            + " - ignoring this new request for this region.");
1931        }
1932
1933        // We are opening this region. If it moves back and forth for whatever reason, we don't
1934        // want to keep returning the stale moved record while we are opening/if we close again.
1935        server.removeFromMovedRegions(region.getEncodedName());
1936
1937        if (previous == null || !previous.booleanValue()) {
1938          htd = htds.get(region.getTable());
1939          if (htd == null) {
1940            htd = server.getTableDescriptors().get(region.getTable());
1941            htds.put(region.getTable(), htd);
1942          }
1943          if (htd == null) {
1944            throw new IOException("Missing table descriptor for " + region.getEncodedName());
1945          }
1946          // If there is no action in progress, we can submit a specific handler.
1947          // Need to pass the expected version in the constructor.
1948          if (server.getExecutorService() == null) {
1949            LOG.info("No executor executorService; skipping open request");
1950          } else {
1951            if (region.isMetaRegion()) {
1952              server.getExecutorService()
1953                .submit(new OpenMetaHandler(server, server, region, htd, masterSystemTime));
1954            } else {
1955              if (regionOpenInfo.getFavoredNodesCount() > 0) {
1956                server.updateRegionFavoredNodesMapping(region.getEncodedName(),
1957                  regionOpenInfo.getFavoredNodesList());
1958              }
1959              if (htd.getPriority() >= HConstants.ADMIN_QOS || region.getTable().isSystemTable()) {
1960                server.getExecutorService().submit(
1961                  new OpenPriorityRegionHandler(server, server, region, htd, masterSystemTime));
1962              } else {
1963                server.getExecutorService()
1964                  .submit(new OpenRegionHandler(server, server, region, htd, masterSystemTime));
1965              }
1966            }
1967          }
1968        }
1969
1970        builder.addOpeningState(RegionOpeningState.OPENED);
1971      } catch (IOException ie) {
1972        LOG.warn("Failed opening region " + region.getRegionNameAsString(), ie);
1973        if (isBulkAssign) {
1974          builder.addOpeningState(RegionOpeningState.FAILED_OPENING);
1975        } else {
1976          throw new ServiceException(ie);
1977        }
1978      }
1979    }
1980    return builder.build();
1981  }
1982
1983  /**
1984   * Warmup a region on this server. This method should only be called by Master. It synchronously
1985   * opens the region and closes the region bringing the most important pages in cache.
1986   */
1987  @Override
1988  public WarmupRegionResponse warmupRegion(final RpcController controller,
1989    final WarmupRegionRequest request) throws ServiceException {
1990    final RegionInfo region = ProtobufUtil.toRegionInfo(request.getRegionInfo());
1991    WarmupRegionResponse response = WarmupRegionResponse.getDefaultInstance();
1992    try {
1993      checkOpen();
1994      String encodedName = region.getEncodedName();
1995      byte[] encodedNameBytes = region.getEncodedNameAsBytes();
1996      final HRegion onlineRegion = server.getRegion(encodedName);
1997      if (onlineRegion != null) {
1998        LOG.info("{} is online; skipping warmup", region);
1999        return response;
2000      }
2001      TableDescriptor htd = server.getTableDescriptors().get(region.getTable());
2002      if (server.getRegionsInTransitionInRS().containsKey(encodedNameBytes)) {
2003        LOG.info("{} is in transition; skipping warmup", region);
2004        return response;
2005      }
2006      LOG.info("Warmup {}", region.getRegionNameAsString());
2007      HRegion.warmupHRegion(region, htd, server.getWAL(region), server.getConfiguration(), server,
2008        null);
2009    } catch (IOException ie) {
2010      LOG.error("Failed warmup of {}", region.getRegionNameAsString(), ie);
2011      throw new ServiceException(ie);
2012    }
2013
2014    return response;
2015  }
2016
2017  private CellScanner getAndReset(RpcController controller) {
2018    HBaseRpcController hrc = (HBaseRpcController) controller;
2019    CellScanner cells = hrc.cellScanner();
2020    hrc.setCellScanner(null);
2021    return cells;
2022  }
2023
2024  /**
2025   * Replay the given changes when distributedLogReplay WAL edits from a failed RS. The guarantee is
2026   * that the given mutations will be durable on the receiving RS if this method returns without any
2027   * exception.
2028   * @param controller the RPC controller
2029   * @param request    the request
2030   * @deprecated Since 3.0.0, will be removed in 4.0.0. Not used any more, put here only for
2031   *             compatibility with old region replica implementation. Now we will use
2032   *             {@code replicateToReplica} method instead.
2033   */
2034  @Deprecated
2035  @Override
2036  @QosPriority(priority = HConstants.REPLAY_QOS)
2037  public ReplicateWALEntryResponse replay(final RpcController controller,
2038    final ReplicateWALEntryRequest request) throws ServiceException {
2039    long before = EnvironmentEdgeManager.currentTime();
2040    CellScanner cells = getAndReset(controller);
2041    try {
2042      checkOpen();
2043      List<WALEntry> entries = request.getEntryList();
2044      if (entries == null || entries.isEmpty()) {
2045        // empty input
2046        return ReplicateWALEntryResponse.newBuilder().build();
2047      }
2048      ByteString regionName = entries.get(0).getKey().getEncodedRegionName();
2049      HRegion region = server.getRegionByEncodedName(regionName.toStringUtf8());
2050      RegionCoprocessorHost coprocessorHost =
2051        ServerRegionReplicaUtil.isDefaultReplica(region.getRegionInfo())
2052          ? region.getCoprocessorHost()
2053          : null; // do not invoke coprocessors if this is a secondary region replica
2054      List<Pair<WALKey, WALEdit>> walEntries = new ArrayList<>();
2055
2056      // Skip adding the edits to WAL if this is a secondary region replica
2057      boolean isPrimary = RegionReplicaUtil.isDefaultReplica(region.getRegionInfo());
2058      Durability durability = isPrimary ? Durability.USE_DEFAULT : Durability.SKIP_WAL;
2059
2060      for (WALEntry entry : entries) {
2061        if (!regionName.equals(entry.getKey().getEncodedRegionName())) {
2062          throw new NotServingRegionException("Replay request contains entries from multiple "
2063            + "regions. First region:" + regionName.toStringUtf8() + " , other region:"
2064            + entry.getKey().getEncodedRegionName());
2065        }
2066        if (server.nonceManager != null && isPrimary) {
2067          long nonceGroup =
2068            entry.getKey().hasNonceGroup() ? entry.getKey().getNonceGroup() : HConstants.NO_NONCE;
2069          long nonce = entry.getKey().hasNonce() ? entry.getKey().getNonce() : HConstants.NO_NONCE;
2070          server.nonceManager.reportOperationFromWal(nonceGroup, nonce,
2071            entry.getKey().getWriteTime());
2072        }
2073        Pair<WALKey, WALEdit> walEntry = (coprocessorHost == null) ? null : new Pair<>();
2074        List<MutationReplay> edits =
2075          WALSplitUtil.getMutationsFromWALEntry(entry, cells, walEntry, durability);
2076        if (coprocessorHost != null) {
2077          // Start coprocessor replay here. The coprocessor is for each WALEdit instead of a
2078          // KeyValue.
2079          if (
2080            coprocessorHost.preWALRestore(region.getRegionInfo(), walEntry.getFirst(),
2081              walEntry.getSecond())
2082          ) {
2083            // if bypass this log entry, ignore it ...
2084            continue;
2085          }
2086          walEntries.add(walEntry);
2087        }
2088        if (edits != null && !edits.isEmpty()) {
2089          // HBASE-17924
2090          // sort to improve lock efficiency
2091          Collections.sort(edits, (v1, v2) -> Row.COMPARATOR.compare(v1.mutation, v2.mutation));
2092          long replaySeqId = (entry.getKey().hasOrigSequenceNumber())
2093            ? entry.getKey().getOrigSequenceNumber()
2094            : entry.getKey().getLogSequenceNumber();
2095          OperationStatus[] result = doReplayBatchOp(region, edits, replaySeqId);
2096          // check if it's a partial success
2097          for (int i = 0; result != null && i < result.length; i++) {
2098            if (result[i] != OperationStatus.SUCCESS) {
2099              throw new IOException(result[i].getExceptionMsg());
2100            }
2101          }
2102        }
2103      }
2104
2105      // sync wal at the end because ASYNC_WAL is used above
2106      WAL wal = region.getWAL();
2107      if (wal != null) {
2108        wal.sync();
2109      }
2110
2111      if (coprocessorHost != null) {
2112        for (Pair<WALKey, WALEdit> entry : walEntries) {
2113          coprocessorHost.postWALRestore(region.getRegionInfo(), entry.getFirst(),
2114            entry.getSecond());
2115        }
2116      }
2117      return ReplicateWALEntryResponse.newBuilder().build();
2118    } catch (IOException ie) {
2119      throw new ServiceException(ie);
2120    } finally {
2121      final MetricsRegionServer metricsRegionServer = server.getMetrics();
2122      if (metricsRegionServer != null) {
2123        metricsRegionServer.updateReplay(EnvironmentEdgeManager.currentTime() - before);
2124      }
2125    }
2126  }
2127
2128  /**
2129   * Replay the given changes on a secondary replica
2130   */
2131  @Override
2132  public ReplicateWALEntryResponse replicateToReplica(RpcController controller,
2133    ReplicateWALEntryRequest request) throws ServiceException {
2134    CellScanner cells = getAndReset(controller);
2135    try {
2136      checkOpen();
2137      List<WALEntry> entries = request.getEntryList();
2138      if (entries == null || entries.isEmpty()) {
2139        // empty input
2140        return ReplicateWALEntryResponse.newBuilder().build();
2141      }
2142      ByteString regionName = entries.get(0).getKey().getEncodedRegionName();
2143      HRegion region = server.getRegionByEncodedName(regionName.toStringUtf8());
2144      if (RegionReplicaUtil.isDefaultReplica(region.getRegionInfo())) {
2145        throw new DoNotRetryIOException(
2146          "Should not replicate to primary replica " + region.getRegionInfo() + ", CODE BUG?");
2147      }
2148      for (WALEntry entry : entries) {
2149        if (!regionName.equals(entry.getKey().getEncodedRegionName())) {
2150          throw new NotServingRegionException(
2151            "ReplicateToReplica request contains entries from multiple " + "regions. First region:"
2152              + regionName.toStringUtf8() + " , other region:"
2153              + entry.getKey().getEncodedRegionName());
2154        }
2155        region.replayWALEntry(entry, cells);
2156      }
2157      return ReplicateWALEntryResponse.newBuilder().build();
2158    } catch (IOException ie) {
2159      throw new ServiceException(ie);
2160    }
2161  }
2162
2163  private void checkShouldRejectReplicationRequest(List<WALEntry> entries) throws IOException {
2164    ReplicationSourceService replicationSource = server.getReplicationSourceService();
2165    if (replicationSource == null || entries.isEmpty()) {
2166      return;
2167    }
2168    // We can ensure that all entries are for one peer, so only need to check one entry's
2169    // table name. if the table hit sync replication at peer side and the peer cluster
2170    // is (or is transiting to) state ACTIVE or DOWNGRADE_ACTIVE, we should reject to apply
2171    // those entries according to the design doc.
2172    TableName table = TableName.valueOf(entries.get(0).getKey().getTableName().toByteArray());
2173    if (
2174      replicationSource.getSyncReplicationPeerInfoProvider().checkState(table,
2175        RejectReplicationRequestStateChecker.get())
2176    ) {
2177      throw new DoNotRetryIOException(
2178        "Reject to apply to sink cluster because sync replication state of sink cluster "
2179          + "is ACTIVE or DOWNGRADE_ACTIVE, table: " + table);
2180    }
2181  }
2182
2183  /**
2184   * Replicate WAL entries on the region server.
2185   * @param controller the RPC controller
2186   * @param request    the request
2187   */
2188  @Override
2189  @QosPriority(priority = HConstants.REPLICATION_QOS)
2190  public ReplicateWALEntryResponse replicateWALEntry(final RpcController controller,
2191    final ReplicateWALEntryRequest request) throws ServiceException {
2192    try {
2193      checkOpen();
2194      if (server.getReplicationSinkService() != null) {
2195        requestCount.increment();
2196        List<WALEntry> entries = request.getEntryList();
2197        checkShouldRejectReplicationRequest(entries);
2198        CellScanner cellScanner = getAndReset(controller);
2199        server.getRegionServerCoprocessorHost().preReplicateLogEntries();
2200        server.getReplicationSinkService().replicateLogEntries(entries, cellScanner,
2201          request.getReplicationClusterId(), request.getSourceBaseNamespaceDirPath(),
2202          request.getSourceHFileArchiveDirPath());
2203        server.getRegionServerCoprocessorHost().postReplicateLogEntries();
2204        return ReplicateWALEntryResponse.newBuilder().build();
2205      } else {
2206        throw new ServiceException("Replication services are not initialized yet");
2207      }
2208    } catch (IOException ie) {
2209      throw new ServiceException(ie);
2210    }
2211  }
2212
2213  /**
2214   * Roll the WAL writer of the region server.
2215   * @param controller the RPC controller
2216   * @param request    the request
2217   */
2218  @Override
2219  public RollWALWriterResponse rollWALWriter(final RpcController controller,
2220    final RollWALWriterRequest request) throws ServiceException {
2221    try {
2222      checkOpen();
2223      requestCount.increment();
2224      server.getRegionServerCoprocessorHost().preRollWALWriterRequest();
2225      server.getWalRoller().requestRollAll();
2226      server.getRegionServerCoprocessorHost().postRollWALWriterRequest();
2227      RollWALWriterResponse.Builder builder = RollWALWriterResponse.newBuilder();
2228      return builder.build();
2229    } catch (IOException ie) {
2230      throw new ServiceException(ie);
2231    }
2232  }
2233
2234  /**
2235   * Stop the region server.
2236   * @param controller the RPC controller
2237   * @param request    the request
2238   */
2239  @Override
2240  @QosPriority(priority = HConstants.ADMIN_QOS)
2241  public StopServerResponse stopServer(final RpcController controller,
2242    final StopServerRequest request) throws ServiceException {
2243    rpcPreCheck("stopServer");
2244    requestCount.increment();
2245    String reason = request.getReason();
2246    server.stop(reason);
2247    return StopServerResponse.newBuilder().build();
2248  }
2249
2250  @Override
2251  public UpdateFavoredNodesResponse updateFavoredNodes(RpcController controller,
2252    UpdateFavoredNodesRequest request) throws ServiceException {
2253    rpcPreCheck("updateFavoredNodes");
2254    List<UpdateFavoredNodesRequest.RegionUpdateInfo> openInfoList = request.getUpdateInfoList();
2255    UpdateFavoredNodesResponse.Builder respBuilder = UpdateFavoredNodesResponse.newBuilder();
2256    for (UpdateFavoredNodesRequest.RegionUpdateInfo regionUpdateInfo : openInfoList) {
2257      RegionInfo hri = ProtobufUtil.toRegionInfo(regionUpdateInfo.getRegion());
2258      if (regionUpdateInfo.getFavoredNodesCount() > 0) {
2259        server.updateRegionFavoredNodesMapping(hri.getEncodedName(),
2260          regionUpdateInfo.getFavoredNodesList());
2261      }
2262    }
2263    respBuilder.setResponse(openInfoList.size());
2264    return respBuilder.build();
2265  }
2266
2267  /**
2268   * Atomically bulk load several HFiles into an open region
2269   * @return true if successful, false is failed but recoverably (no action)
2270   * @throws ServiceException if failed unrecoverably
2271   */
2272  @Override
2273  public BulkLoadHFileResponse bulkLoadHFile(final RpcController controller,
2274    final BulkLoadHFileRequest request) throws ServiceException {
2275    long start = EnvironmentEdgeManager.currentTime();
2276    List<String> clusterIds = new ArrayList<>(request.getClusterIdsList());
2277    if (clusterIds.contains(this.server.getClusterId())) {
2278      return BulkLoadHFileResponse.newBuilder().setLoaded(true).build();
2279    } else {
2280      clusterIds.add(this.server.getClusterId());
2281    }
2282    try {
2283      checkOpen();
2284      requestCount.increment();
2285      HRegion region = getRegion(request.getRegion());
2286      final boolean spaceQuotaEnabled = QuotaUtil.isQuotaEnabled(getConfiguration());
2287      long sizeToBeLoaded = -1;
2288
2289      // Check to see if this bulk load would exceed the space quota for this table
2290      if (spaceQuotaEnabled) {
2291        ActivePolicyEnforcement activeSpaceQuotas = getSpaceQuotaManager().getActiveEnforcements();
2292        SpaceViolationPolicyEnforcement enforcement =
2293          activeSpaceQuotas.getPolicyEnforcement(region);
2294        if (enforcement != null) {
2295          // Bulk loads must still be atomic. We must enact all or none.
2296          List<String> filePaths = new ArrayList<>(request.getFamilyPathCount());
2297          for (FamilyPath familyPath : request.getFamilyPathList()) {
2298            filePaths.add(familyPath.getPath());
2299          }
2300          // Check if the batch of files exceeds the current quota
2301          sizeToBeLoaded = enforcement.computeBulkLoadSize(getFileSystem(filePaths), filePaths);
2302        }
2303      }
2304      // secure bulk load
2305      Map<byte[], List<Path>> map =
2306        server.getSecureBulkLoadManager().secureBulkLoadHFiles(region, request, clusterIds);
2307      BulkLoadHFileResponse.Builder builder = BulkLoadHFileResponse.newBuilder();
2308      builder.setLoaded(map != null);
2309      if (map != null) {
2310        // Treat any negative size as a flag to "ignore" updating the region size as that is
2311        // not possible to occur in real life (cannot bulk load a file with negative size)
2312        if (spaceQuotaEnabled && sizeToBeLoaded > 0) {
2313          if (LOG.isTraceEnabled()) {
2314            LOG.trace("Incrementing space use of " + region.getRegionInfo() + " by "
2315              + sizeToBeLoaded + " bytes");
2316          }
2317          // Inform space quotas of the new files for this region
2318          getSpaceQuotaManager().getRegionSizeStore().incrementRegionSize(region.getRegionInfo(),
2319            sizeToBeLoaded);
2320        }
2321      }
2322      return builder.build();
2323    } catch (IOException ie) {
2324      throw new ServiceException(ie);
2325    } finally {
2326      final MetricsRegionServer metricsRegionServer = server.getMetrics();
2327      if (metricsRegionServer != null) {
2328        metricsRegionServer.updateBulkLoad(EnvironmentEdgeManager.currentTime() - start);
2329      }
2330    }
2331  }
2332
2333  @Override
2334  public PrepareBulkLoadResponse prepareBulkLoad(RpcController controller,
2335    PrepareBulkLoadRequest request) throws ServiceException {
2336    try {
2337      checkOpen();
2338      requestCount.increment();
2339
2340      HRegion region = getRegion(request.getRegion());
2341
2342      String bulkToken = server.getSecureBulkLoadManager().prepareBulkLoad(region, request);
2343      PrepareBulkLoadResponse.Builder builder = PrepareBulkLoadResponse.newBuilder();
2344      builder.setBulkToken(bulkToken);
2345      return builder.build();
2346    } catch (IOException ie) {
2347      throw new ServiceException(ie);
2348    }
2349  }
2350
2351  @Override
2352  public CleanupBulkLoadResponse cleanupBulkLoad(RpcController controller,
2353    CleanupBulkLoadRequest request) throws ServiceException {
2354    try {
2355      checkOpen();
2356      requestCount.increment();
2357
2358      HRegion region = getRegion(request.getRegion());
2359
2360      server.getSecureBulkLoadManager().cleanupBulkLoad(region, request);
2361      return CleanupBulkLoadResponse.newBuilder().build();
2362    } catch (IOException ie) {
2363      throw new ServiceException(ie);
2364    }
2365  }
2366
2367  @Override
2368  public CoprocessorServiceResponse execService(final RpcController controller,
2369    final CoprocessorServiceRequest request) throws ServiceException {
2370    try {
2371      checkOpen();
2372      requestCount.increment();
2373      HRegion region = getRegion(request.getRegion());
2374      Message result = execServiceOnRegion(region, request.getCall());
2375      CoprocessorServiceResponse.Builder builder = CoprocessorServiceResponse.newBuilder();
2376      builder.setRegion(RequestConverter.buildRegionSpecifier(RegionSpecifierType.REGION_NAME,
2377        region.getRegionInfo().getRegionName()));
2378      // TODO: COPIES!!!!!!
2379      builder.setValue(builder.getValueBuilder().setName(result.getClass().getName()).setValue(
2380        org.apache.hbase.thirdparty.com.google.protobuf.ByteString.copyFrom(result.toByteArray())));
2381      return builder.build();
2382    } catch (IOException ie) {
2383      throw new ServiceException(ie);
2384    }
2385  }
2386
2387  private FileSystem getFileSystem(List<String> filePaths) throws IOException {
2388    if (filePaths.isEmpty()) {
2389      // local hdfs
2390      return server.getFileSystem();
2391    }
2392    // source hdfs
2393    return new Path(filePaths.get(0)).getFileSystem(server.getConfiguration());
2394  }
2395
2396  private Message execServiceOnRegion(HRegion region,
2397    final ClientProtos.CoprocessorServiceCall serviceCall) throws IOException {
2398    // ignore the passed in controller (from the serialized call)
2399    ServerRpcController execController = new ServerRpcController();
2400    return region.execService(execController, serviceCall);
2401  }
2402
2403  private boolean shouldRejectRequestsFromClient(HRegion region) {
2404    TableName table = region.getRegionInfo().getTable();
2405    ReplicationSourceService service = server.getReplicationSourceService();
2406    return service != null && service.getSyncReplicationPeerInfoProvider().checkState(table,
2407      RejectRequestsFromClientStateChecker.get());
2408  }
2409
2410  private void rejectIfInStandByState(HRegion region) throws DoNotRetryIOException {
2411    if (shouldRejectRequestsFromClient(region)) {
2412      throw new DoNotRetryIOException(
2413        region.getRegionInfo().getRegionNameAsString() + " is in STANDBY state.");
2414    }
2415  }
2416
2417  /**
2418   * Get data from a table.
2419   * @param controller the RPC controller
2420   * @param request    the get request
2421   */
2422  @Override
2423  public GetResponse get(final RpcController controller, final GetRequest request)
2424    throws ServiceException {
2425    long before = EnvironmentEdgeManager.currentTime();
2426    OperationQuota quota = null;
2427    HRegion region = null;
2428    try {
2429      checkOpen();
2430      requestCount.increment();
2431      rpcGetRequestCount.increment();
2432      region = getRegion(request.getRegion());
2433      rejectIfInStandByState(region);
2434
2435      GetResponse.Builder builder = GetResponse.newBuilder();
2436      ClientProtos.Get get = request.getGet();
2437      // An asynchbase client, https://github.com/OpenTSDB/asynchbase, starts by trying to do
2438      // a get closest before. Throwing the UnknownProtocolException signals it that it needs
2439      // to switch and do hbase2 protocol (HBase servers do not tell clients what versions
2440      // they are; its a problem for non-native clients like asynchbase. HBASE-20225.
2441      if (get.hasClosestRowBefore() && get.getClosestRowBefore()) {
2442        throw new UnknownProtocolException("Is this a pre-hbase-1.0.0 or asynchbase client? "
2443          + "Client is invoking getClosestRowBefore removed in hbase-2.0.0 replaced by "
2444          + "reverse Scan.");
2445      }
2446      Boolean existence = null;
2447      Result r = null;
2448      RpcCallContext context = RpcServer.getCurrentCall().orElse(null);
2449      quota = getRpcQuotaManager().checkQuota(region, OperationQuota.OperationType.GET);
2450
2451      Get clientGet = ProtobufUtil.toGet(get);
2452      if (get.getExistenceOnly() && region.getCoprocessorHost() != null) {
2453        existence = region.getCoprocessorHost().preExists(clientGet);
2454      }
2455      if (existence == null) {
2456        if (context != null) {
2457          r = get(clientGet, (region), null, context);
2458        } else {
2459          // for test purpose
2460          r = region.get(clientGet);
2461        }
2462        if (get.getExistenceOnly()) {
2463          boolean exists = r.getExists();
2464          if (region.getCoprocessorHost() != null) {
2465            exists = region.getCoprocessorHost().postExists(clientGet, exists);
2466          }
2467          existence = exists;
2468        }
2469      }
2470      if (existence != null) {
2471        ClientProtos.Result pbr =
2472          ProtobufUtil.toResult(existence, region.getRegionInfo().getReplicaId() != 0);
2473        builder.setResult(pbr);
2474      } else if (r != null) {
2475        ClientProtos.Result pbr;
2476        if (
2477          isClientCellBlockSupport(context) && controller instanceof HBaseRpcController
2478            && VersionInfoUtil.hasMinimumVersion(context.getClientVersionInfo(), 1, 3)
2479        ) {
2480          pbr = ProtobufUtil.toResultNoData(r);
2481          ((HBaseRpcController) controller)
2482            .setCellScanner(CellUtil.createCellScanner(r.rawCells()));
2483          addSize(context, r);
2484        } else {
2485          pbr = ProtobufUtil.toResult(r);
2486        }
2487        builder.setResult(pbr);
2488      }
2489      // r.cells is null when an table.exists(get) call
2490      if (r != null && r.rawCells() != null) {
2491        quota.addGetResult(r);
2492      }
2493      return builder.build();
2494    } catch (IOException ie) {
2495      throw new ServiceException(ie);
2496    } finally {
2497      final MetricsRegionServer metricsRegionServer = server.getMetrics();
2498      if (metricsRegionServer != null) {
2499        TableDescriptor td = region != null ? region.getTableDescriptor() : null;
2500        if (td != null) {
2501          metricsRegionServer.updateGet(region, EnvironmentEdgeManager.currentTime() - before);
2502        }
2503      }
2504      if (quota != null) {
2505        quota.close();
2506      }
2507    }
2508  }
2509
2510  private Result get(Get get, HRegion region, RegionScannersCloseCallBack closeCallBack,
2511    RpcCallContext context) throws IOException {
2512    region.prepareGet(get);
2513    boolean stale = region.getRegionInfo().getReplicaId() != 0;
2514
2515    // This method is almost the same as HRegion#get.
2516    List<Cell> results = new ArrayList<>();
2517    long before = EnvironmentEdgeManager.currentTime();
2518    // pre-get CP hook
2519    if (region.getCoprocessorHost() != null) {
2520      if (region.getCoprocessorHost().preGet(get, results)) {
2521        region.metricsUpdateForGet(results, before);
2522        return Result.create(results, get.isCheckExistenceOnly() ? !results.isEmpty() : null,
2523          stale);
2524      }
2525    }
2526    Scan scan = new Scan(get);
2527    if (scan.getLoadColumnFamiliesOnDemandValue() == null) {
2528      scan.setLoadColumnFamiliesOnDemand(region.isLoadingCfsOnDemandDefault());
2529    }
2530    RegionScannerImpl scanner = null;
2531    try {
2532      scanner = region.getScanner(scan);
2533      scanner.next(results);
2534    } finally {
2535      if (scanner != null) {
2536        if (closeCallBack == null) {
2537          // If there is a context then the scanner can be added to the current
2538          // RpcCallContext. The rpc callback will take care of closing the
2539          // scanner, for eg in case
2540          // of get()
2541          context.setCallBack(scanner);
2542        } else {
2543          // The call is from multi() where the results from the get() are
2544          // aggregated and then send out to the
2545          // rpc. The rpccall back will close all such scanners created as part
2546          // of multi().
2547          closeCallBack.addScanner(scanner);
2548        }
2549      }
2550    }
2551
2552    // post-get CP hook
2553    if (region.getCoprocessorHost() != null) {
2554      region.getCoprocessorHost().postGet(get, results);
2555    }
2556    region.metricsUpdateForGet(results, before);
2557
2558    return Result.create(results, get.isCheckExistenceOnly() ? !results.isEmpty() : null, stale);
2559  }
2560
2561  private void checkBatchSizeAndLogLargeSize(MultiRequest request) throws ServiceException {
2562    int sum = 0;
2563    String firstRegionName = null;
2564    for (RegionAction regionAction : request.getRegionActionList()) {
2565      if (sum == 0) {
2566        firstRegionName = Bytes.toStringBinary(regionAction.getRegion().getValue().toByteArray());
2567      }
2568      sum += regionAction.getActionCount();
2569    }
2570    if (sum > rowSizeWarnThreshold) {
2571      LOG.warn("Large batch operation detected (greater than " + rowSizeWarnThreshold
2572        + ") (HBASE-18023)." + " Requested Number of Rows: " + sum + " Client: "
2573        + RpcServer.getRequestUserName().orElse(null) + "/"
2574        + RpcServer.getRemoteAddress().orElse(null) + " first region in multi=" + firstRegionName);
2575      if (rejectRowsWithSizeOverThreshold) {
2576        throw new ServiceException(
2577          "Rejecting large batch operation for current batch with firstRegionName: "
2578            + firstRegionName + " , Requested Number of Rows: " + sum + " , Size Threshold: "
2579            + rowSizeWarnThreshold);
2580      }
2581    }
2582  }
2583
2584  private void failRegionAction(MultiResponse.Builder responseBuilder,
2585    RegionActionResult.Builder regionActionResultBuilder, RegionAction regionAction,
2586    CellScanner cellScanner, Throwable error) {
2587    rpcServer.getMetrics().exception(error);
2588    regionActionResultBuilder.setException(ResponseConverter.buildException(error));
2589    responseBuilder.addRegionActionResult(regionActionResultBuilder.build());
2590    // All Mutations in this RegionAction not executed as we can not see the Region online here
2591    // in this RS. Will be retried from Client. Skipping all the Cells in CellScanner
2592    // corresponding to these Mutations.
2593    if (cellScanner != null) {
2594      skipCellsForMutations(regionAction.getActionList(), cellScanner);
2595    }
2596  }
2597
2598  private boolean isReplicationRequest(Action action) {
2599    // replication request can only be put or delete.
2600    if (!action.hasMutation()) {
2601      return false;
2602    }
2603    MutationProto mutation = action.getMutation();
2604    MutationType type = mutation.getMutateType();
2605    if (type != MutationType.PUT && type != MutationType.DELETE) {
2606      return false;
2607    }
2608    // replication will set a special attribute so we can make use of it to decide whether a request
2609    // is for replication.
2610    return mutation.getAttributeList().stream().map(p -> p.getName())
2611      .filter(n -> n.equals(ReplicationUtils.REPLICATION_ATTR_NAME)).findAny().isPresent();
2612  }
2613
2614  /**
2615   * Execute multiple actions on a table: get, mutate, and/or execCoprocessor
2616   * @param rpcc    the RPC controller
2617   * @param request the multi request
2618   */
2619  @Override
2620  public MultiResponse multi(final RpcController rpcc, final MultiRequest request)
2621    throws ServiceException {
2622    try {
2623      checkOpen();
2624    } catch (IOException ie) {
2625      throw new ServiceException(ie);
2626    }
2627
2628    checkBatchSizeAndLogLargeSize(request);
2629
2630    // rpc controller is how we bring in data via the back door; it is unprotobuf'ed data.
2631    // It is also the conduit via which we pass back data.
2632    HBaseRpcController controller = (HBaseRpcController) rpcc;
2633    CellScanner cellScanner = controller != null ? controller.cellScanner() : null;
2634    if (controller != null) {
2635      controller.setCellScanner(null);
2636    }
2637
2638    long nonceGroup = request.hasNonceGroup() ? request.getNonceGroup() : HConstants.NO_NONCE;
2639
2640    MultiResponse.Builder responseBuilder = MultiResponse.newBuilder();
2641    RegionActionResult.Builder regionActionResultBuilder = RegionActionResult.newBuilder();
2642    this.rpcMultiRequestCount.increment();
2643    this.requestCount.increment();
2644    ActivePolicyEnforcement spaceQuotaEnforcement = getSpaceQuotaManager().getActiveEnforcements();
2645
2646    // We no longer use MultiRequest#condition. Instead, we use RegionAction#condition. The
2647    // following logic is for backward compatibility as old clients still use
2648    // MultiRequest#condition in case of checkAndMutate with RowMutations.
2649    if (request.hasCondition()) {
2650      if (request.getRegionActionList().isEmpty()) {
2651        // If the region action list is empty, do nothing.
2652        responseBuilder.setProcessed(true);
2653        return responseBuilder.build();
2654      }
2655
2656      RegionAction regionAction = request.getRegionAction(0);
2657
2658      // When request.hasCondition() is true, regionAction.getAtomic() should be always true. So
2659      // we can assume regionAction.getAtomic() is true here.
2660      assert regionAction.getAtomic();
2661
2662      OperationQuota quota;
2663      HRegion region;
2664      RegionSpecifier regionSpecifier = regionAction.getRegion();
2665
2666      try {
2667        region = getRegion(regionSpecifier);
2668        quota = getRpcQuotaManager().checkQuota(region, regionAction.getActionList());
2669      } catch (IOException e) {
2670        failRegionAction(responseBuilder, regionActionResultBuilder, regionAction, cellScanner, e);
2671        return responseBuilder.build();
2672      }
2673
2674      try {
2675        boolean rejectIfFromClient = shouldRejectRequestsFromClient(region);
2676        // We only allow replication in standby state and it will not set the atomic flag.
2677        if (rejectIfFromClient) {
2678          failRegionAction(responseBuilder, regionActionResultBuilder, regionAction, cellScanner,
2679            new DoNotRetryIOException(
2680              region.getRegionInfo().getRegionNameAsString() + " is in STANDBY state"));
2681          return responseBuilder.build();
2682        }
2683
2684        try {
2685          CheckAndMutateResult result = checkAndMutate(region, regionAction.getActionList(),
2686            cellScanner, request.getCondition(), nonceGroup, spaceQuotaEnforcement);
2687          responseBuilder.setProcessed(result.isSuccess());
2688          ClientProtos.ResultOrException.Builder resultOrExceptionOrBuilder =
2689            ClientProtos.ResultOrException.newBuilder();
2690          for (int i = 0; i < regionAction.getActionCount(); i++) {
2691            // To unify the response format with doNonAtomicRegionMutation and read through
2692            // client's AsyncProcess we have to add an empty result instance per operation
2693            resultOrExceptionOrBuilder.clear();
2694            resultOrExceptionOrBuilder.setIndex(i);
2695            regionActionResultBuilder.addResultOrException(resultOrExceptionOrBuilder.build());
2696          }
2697        } catch (IOException e) {
2698          rpcServer.getMetrics().exception(e);
2699          // As it's an atomic operation with a condition, we may expect it's a global failure.
2700          regionActionResultBuilder.setException(ResponseConverter.buildException(e));
2701        }
2702      } finally {
2703        quota.close();
2704      }
2705
2706      responseBuilder.addRegionActionResult(regionActionResultBuilder.build());
2707      ClientProtos.RegionLoadStats regionLoadStats = region.getLoadStatistics();
2708      if (regionLoadStats != null) {
2709        responseBuilder.setRegionStatistics(MultiRegionLoadStats.newBuilder()
2710          .addRegion(regionSpecifier).addStat(regionLoadStats).build());
2711      }
2712      return responseBuilder.build();
2713    }
2714
2715    // this will contain all the cells that we need to return. It's created later, if needed.
2716    List<CellScannable> cellsToReturn = null;
2717    RegionScannersCloseCallBack closeCallBack = null;
2718    RpcCallContext context = RpcServer.getCurrentCall().orElse(null);
2719    Map<RegionSpecifier, ClientProtos.RegionLoadStats> regionStats =
2720      new HashMap<>(request.getRegionActionCount());
2721
2722    for (RegionAction regionAction : request.getRegionActionList()) {
2723      OperationQuota quota;
2724      HRegion region;
2725      RegionSpecifier regionSpecifier = regionAction.getRegion();
2726      regionActionResultBuilder.clear();
2727
2728      try {
2729        region = getRegion(regionSpecifier);
2730        quota = getRpcQuotaManager().checkQuota(region, regionAction.getActionList());
2731      } catch (IOException e) {
2732        failRegionAction(responseBuilder, regionActionResultBuilder, regionAction, cellScanner, e);
2733        continue; // For this region it's a failure.
2734      }
2735
2736      try {
2737        boolean rejectIfFromClient = shouldRejectRequestsFromClient(region);
2738
2739        if (regionAction.hasCondition()) {
2740          // We only allow replication in standby state and it will not set the atomic flag.
2741          if (rejectIfFromClient) {
2742            failRegionAction(responseBuilder, regionActionResultBuilder, regionAction, cellScanner,
2743              new DoNotRetryIOException(
2744                region.getRegionInfo().getRegionNameAsString() + " is in STANDBY state"));
2745            continue;
2746          }
2747
2748          try {
2749            ClientProtos.ResultOrException.Builder resultOrExceptionOrBuilder =
2750              ClientProtos.ResultOrException.newBuilder();
2751            if (regionAction.getActionCount() == 1) {
2752              CheckAndMutateResult result =
2753                checkAndMutate(region, quota, regionAction.getAction(0).getMutation(), cellScanner,
2754                  regionAction.getCondition(), nonceGroup, spaceQuotaEnforcement);
2755              regionActionResultBuilder.setProcessed(result.isSuccess());
2756              resultOrExceptionOrBuilder.setIndex(0);
2757              if (result.getResult() != null) {
2758                resultOrExceptionOrBuilder.setResult(ProtobufUtil.toResult(result.getResult()));
2759              }
2760              regionActionResultBuilder.addResultOrException(resultOrExceptionOrBuilder.build());
2761            } else {
2762              CheckAndMutateResult result = checkAndMutate(region, regionAction.getActionList(),
2763                cellScanner, regionAction.getCondition(), nonceGroup, spaceQuotaEnforcement);
2764              regionActionResultBuilder.setProcessed(result.isSuccess());
2765              for (int i = 0; i < regionAction.getActionCount(); i++) {
2766                if (i == 0 && result.getResult() != null) {
2767                  // Set the result of the Increment/Append operations to the first element of the
2768                  // ResultOrException list
2769                  resultOrExceptionOrBuilder.setIndex(i);
2770                  regionActionResultBuilder.addResultOrException(resultOrExceptionOrBuilder
2771                    .setResult(ProtobufUtil.toResult(result.getResult())).build());
2772                  continue;
2773                }
2774                // To unify the response format with doNonAtomicRegionMutation and read through
2775                // client's AsyncProcess we have to add an empty result instance per operation
2776                resultOrExceptionOrBuilder.clear();
2777                resultOrExceptionOrBuilder.setIndex(i);
2778                regionActionResultBuilder.addResultOrException(resultOrExceptionOrBuilder.build());
2779              }
2780            }
2781          } catch (IOException e) {
2782            rpcServer.getMetrics().exception(e);
2783            // As it's an atomic operation with a condition, we may expect it's a global failure.
2784            regionActionResultBuilder.setException(ResponseConverter.buildException(e));
2785          }
2786        } else if (regionAction.hasAtomic() && regionAction.getAtomic()) {
2787          // We only allow replication in standby state and it will not set the atomic flag.
2788          if (rejectIfFromClient) {
2789            failRegionAction(responseBuilder, regionActionResultBuilder, regionAction, cellScanner,
2790              new DoNotRetryIOException(
2791                region.getRegionInfo().getRegionNameAsString() + " is in STANDBY state"));
2792            continue;
2793          }
2794          try {
2795            doAtomicBatchOp(regionActionResultBuilder, region, quota, regionAction.getActionList(),
2796              cellScanner, nonceGroup, spaceQuotaEnforcement);
2797            regionActionResultBuilder.setProcessed(true);
2798            // We no longer use MultiResponse#processed. Instead, we use
2799            // RegionActionResult#processed. This is for backward compatibility for old clients.
2800            responseBuilder.setProcessed(true);
2801          } catch (IOException e) {
2802            rpcServer.getMetrics().exception(e);
2803            // As it's atomic, we may expect it's a global failure.
2804            regionActionResultBuilder.setException(ResponseConverter.buildException(e));
2805          }
2806        } else {
2807          if (
2808            rejectIfFromClient && regionAction.getActionCount() > 0
2809              && !isReplicationRequest(regionAction.getAction(0))
2810          ) {
2811            // fail if it is not a replication request
2812            failRegionAction(responseBuilder, regionActionResultBuilder, regionAction, cellScanner,
2813              new DoNotRetryIOException(
2814                region.getRegionInfo().getRegionNameAsString() + " is in STANDBY state"));
2815            continue;
2816          }
2817          // doNonAtomicRegionMutation manages the exception internally
2818          if (context != null && closeCallBack == null) {
2819            // An RpcCallBack that creates a list of scanners that needs to perform callBack
2820            // operation on completion of multiGets.
2821            // Set this only once
2822            closeCallBack = new RegionScannersCloseCallBack();
2823            context.setCallBack(closeCallBack);
2824          }
2825          cellsToReturn = doNonAtomicRegionMutation(region, quota, regionAction, cellScanner,
2826            regionActionResultBuilder, cellsToReturn, nonceGroup, closeCallBack, context,
2827            spaceQuotaEnforcement);
2828        }
2829      } finally {
2830        quota.close();
2831      }
2832
2833      responseBuilder.addRegionActionResult(regionActionResultBuilder.build());
2834      ClientProtos.RegionLoadStats regionLoadStats = region.getLoadStatistics();
2835      if (regionLoadStats != null) {
2836        regionStats.put(regionSpecifier, regionLoadStats);
2837      }
2838    }
2839    // Load the controller with the Cells to return.
2840    if (cellsToReturn != null && !cellsToReturn.isEmpty() && controller != null) {
2841      controller.setCellScanner(CellUtil.createCellScanner(cellsToReturn));
2842    }
2843
2844    MultiRegionLoadStats.Builder builder = MultiRegionLoadStats.newBuilder();
2845    for (Entry<RegionSpecifier, ClientProtos.RegionLoadStats> stat : regionStats.entrySet()) {
2846      builder.addRegion(stat.getKey());
2847      builder.addStat(stat.getValue());
2848    }
2849    responseBuilder.setRegionStatistics(builder);
2850    return responseBuilder.build();
2851  }
2852
2853  private void skipCellsForMutations(List<Action> actions, CellScanner cellScanner) {
2854    if (cellScanner == null) {
2855      return;
2856    }
2857    for (Action action : actions) {
2858      skipCellsForMutation(action, cellScanner);
2859    }
2860  }
2861
2862  private void skipCellsForMutation(Action action, CellScanner cellScanner) {
2863    if (cellScanner == null) {
2864      return;
2865    }
2866    try {
2867      if (action.hasMutation()) {
2868        MutationProto m = action.getMutation();
2869        if (m.hasAssociatedCellCount()) {
2870          for (int i = 0; i < m.getAssociatedCellCount(); i++) {
2871            cellScanner.advance();
2872          }
2873        }
2874      }
2875    } catch (IOException e) {
2876      // No need to handle these Individual Muatation level issue. Any way this entire RegionAction
2877      // marked as failed as we could not see the Region here. At client side the top level
2878      // RegionAction exception will be considered first.
2879      LOG.error("Error while skipping Cells in CellScanner for invalid Region Mutations", e);
2880    }
2881  }
2882
2883  /**
2884   * Mutate data in a table.
2885   * @param rpcc    the RPC controller
2886   * @param request the mutate request
2887   */
2888  @Override
2889  public MutateResponse mutate(final RpcController rpcc, final MutateRequest request)
2890    throws ServiceException {
2891    // rpc controller is how we bring in data via the back door; it is unprotobuf'ed data.
2892    // It is also the conduit via which we pass back data.
2893    HBaseRpcController controller = (HBaseRpcController) rpcc;
2894    CellScanner cellScanner = controller != null ? controller.cellScanner() : null;
2895    OperationQuota quota = null;
2896    RpcCallContext context = RpcServer.getCurrentCall().orElse(null);
2897    // Clear scanner so we are not holding on to reference across call.
2898    if (controller != null) {
2899      controller.setCellScanner(null);
2900    }
2901    try {
2902      checkOpen();
2903      requestCount.increment();
2904      rpcMutateRequestCount.increment();
2905      HRegion region = getRegion(request.getRegion());
2906      rejectIfInStandByState(region);
2907      MutateResponse.Builder builder = MutateResponse.newBuilder();
2908      MutationProto mutation = request.getMutation();
2909      if (!region.getRegionInfo().isMetaRegion()) {
2910        server.getMemStoreFlusher().reclaimMemStoreMemory();
2911      }
2912      long nonceGroup = request.hasNonceGroup() ? request.getNonceGroup() : HConstants.NO_NONCE;
2913      quota = getRpcQuotaManager().checkQuota(region, OperationQuota.OperationType.MUTATE);
2914      ActivePolicyEnforcement spaceQuotaEnforcement =
2915        getSpaceQuotaManager().getActiveEnforcements();
2916
2917      if (request.hasCondition()) {
2918        CheckAndMutateResult result = checkAndMutate(region, quota, mutation, cellScanner,
2919          request.getCondition(), nonceGroup, spaceQuotaEnforcement);
2920        builder.setProcessed(result.isSuccess());
2921        boolean clientCellBlockSupported = isClientCellBlockSupport(context);
2922        addResult(builder, result.getResult(), controller, clientCellBlockSupported);
2923        if (clientCellBlockSupported) {
2924          addSize(context, result.getResult());
2925        }
2926      } else {
2927        Result r = null;
2928        Boolean processed = null;
2929        MutationType type = mutation.getMutateType();
2930        switch (type) {
2931          case APPEND:
2932            // TODO: this doesn't actually check anything.
2933            r = append(region, quota, mutation, cellScanner, nonceGroup, spaceQuotaEnforcement);
2934            break;
2935          case INCREMENT:
2936            // TODO: this doesn't actually check anything.
2937            r = increment(region, quota, mutation, cellScanner, nonceGroup, spaceQuotaEnforcement);
2938            break;
2939          case PUT:
2940            put(region, quota, mutation, cellScanner, spaceQuotaEnforcement);
2941            processed = Boolean.TRUE;
2942            break;
2943          case DELETE:
2944            delete(region, quota, mutation, cellScanner, spaceQuotaEnforcement);
2945            processed = Boolean.TRUE;
2946            break;
2947          default:
2948            throw new DoNotRetryIOException("Unsupported mutate type: " + type.name());
2949        }
2950        if (processed != null) {
2951          builder.setProcessed(processed);
2952        }
2953        boolean clientCellBlockSupported = isClientCellBlockSupport(context);
2954        addResult(builder, r, controller, clientCellBlockSupported);
2955        if (clientCellBlockSupported) {
2956          addSize(context, r);
2957        }
2958      }
2959      return builder.build();
2960    } catch (IOException ie) {
2961      server.checkFileSystem();
2962      throw new ServiceException(ie);
2963    } finally {
2964      if (quota != null) {
2965        quota.close();
2966      }
2967    }
2968  }
2969
2970  private void put(HRegion region, OperationQuota quota, MutationProto mutation,
2971    CellScanner cellScanner, ActivePolicyEnforcement spaceQuota) throws IOException {
2972    long before = EnvironmentEdgeManager.currentTime();
2973    Put put = ProtobufUtil.toPut(mutation, cellScanner);
2974    checkCellSizeLimit(region, put);
2975    spaceQuota.getPolicyEnforcement(region).check(put);
2976    quota.addMutation(put);
2977    region.put(put);
2978
2979    MetricsRegionServer metricsRegionServer = server.getMetrics();
2980    if (metricsRegionServer != null) {
2981      long after = EnvironmentEdgeManager.currentTime();
2982      metricsRegionServer.updatePut(region, after - before);
2983    }
2984  }
2985
2986  private void delete(HRegion region, OperationQuota quota, MutationProto mutation,
2987    CellScanner cellScanner, ActivePolicyEnforcement spaceQuota) throws IOException {
2988    long before = EnvironmentEdgeManager.currentTime();
2989    Delete delete = ProtobufUtil.toDelete(mutation, cellScanner);
2990    checkCellSizeLimit(region, delete);
2991    spaceQuota.getPolicyEnforcement(region).check(delete);
2992    quota.addMutation(delete);
2993    region.delete(delete);
2994
2995    MetricsRegionServer metricsRegionServer = server.getMetrics();
2996    if (metricsRegionServer != null) {
2997      long after = EnvironmentEdgeManager.currentTime();
2998      metricsRegionServer.updateDelete(region, after - before);
2999    }
3000  }
3001
3002  private CheckAndMutateResult checkAndMutate(HRegion region, OperationQuota quota,
3003    MutationProto mutation, CellScanner cellScanner, Condition condition, long nonceGroup,
3004    ActivePolicyEnforcement spaceQuota) throws IOException {
3005    long before = EnvironmentEdgeManager.currentTime();
3006    CheckAndMutate checkAndMutate = ProtobufUtil.toCheckAndMutate(condition, mutation, cellScanner);
3007    long nonce = mutation.hasNonce() ? mutation.getNonce() : HConstants.NO_NONCE;
3008    checkCellSizeLimit(region, (Mutation) checkAndMutate.getAction());
3009    spaceQuota.getPolicyEnforcement(region).check((Mutation) checkAndMutate.getAction());
3010    quota.addMutation((Mutation) checkAndMutate.getAction());
3011
3012    CheckAndMutateResult result = null;
3013    if (region.getCoprocessorHost() != null) {
3014      result = region.getCoprocessorHost().preCheckAndMutate(checkAndMutate);
3015    }
3016    if (result == null) {
3017      result = region.checkAndMutate(checkAndMutate, nonceGroup, nonce);
3018      if (region.getCoprocessorHost() != null) {
3019        result = region.getCoprocessorHost().postCheckAndMutate(checkAndMutate, result);
3020      }
3021    }
3022    MetricsRegionServer metricsRegionServer = server.getMetrics();
3023    if (metricsRegionServer != null) {
3024      long after = EnvironmentEdgeManager.currentTime();
3025      metricsRegionServer.updateCheckAndMutate(region, after - before);
3026
3027      MutationType type = mutation.getMutateType();
3028      switch (type) {
3029        case PUT:
3030          metricsRegionServer.updateCheckAndPut(region, after - before);
3031          break;
3032        case DELETE:
3033          metricsRegionServer.updateCheckAndDelete(region, after - before);
3034          break;
3035        default:
3036          break;
3037      }
3038    }
3039    return result;
3040  }
3041
3042  // This is used to keep compatible with the old client implementation. Consider remove it if we
3043  // decide to drop the support of the client that still sends close request to a region scanner
3044  // which has already been exhausted.
3045  @Deprecated
3046  private static final IOException SCANNER_ALREADY_CLOSED = new IOException() {
3047
3048    private static final long serialVersionUID = -4305297078988180130L;
3049
3050    @Override
3051    public synchronized Throwable fillInStackTrace() {
3052      return this;
3053    }
3054  };
3055
3056  private RegionScannerHolder getRegionScanner(ScanRequest request) throws IOException {
3057    String scannerName = toScannerName(request.getScannerId());
3058    RegionScannerHolder rsh = this.scanners.get(scannerName);
3059    if (rsh == null) {
3060      // just ignore the next or close request if scanner does not exists.
3061      if (closedScanners.getIfPresent(scannerName) != null) {
3062        throw SCANNER_ALREADY_CLOSED;
3063      } else {
3064        LOG.warn("Client tried to access missing scanner " + scannerName);
3065        throw new UnknownScannerException(
3066          "Unknown scanner '" + scannerName + "'. This can happen due to any of the following "
3067            + "reasons: a) Scanner id given is wrong, b) Scanner lease expired because of "
3068            + "long wait between consecutive client checkins, c) Server may be closing down, "
3069            + "d) RegionServer restart during upgrade.\nIf the issue is due to reason (b), a "
3070            + "possible fix would be increasing the value of"
3071            + "'hbase.client.scanner.timeout.period' configuration.");
3072      }
3073    }
3074    rejectIfInStandByState(rsh.r);
3075    RegionInfo hri = rsh.s.getRegionInfo();
3076    // Yes, should be the same instance
3077    if (server.getOnlineRegion(hri.getRegionName()) != rsh.r) {
3078      String msg = "Region has changed on the scanner " + scannerName + ": regionName="
3079        + hri.getRegionNameAsString() + ", scannerRegionName=" + rsh.r;
3080      LOG.warn(msg + ", closing...");
3081      scanners.remove(scannerName);
3082      try {
3083        rsh.s.close();
3084      } catch (IOException e) {
3085        LOG.warn("Getting exception closing " + scannerName, e);
3086      } finally {
3087        try {
3088          server.getLeaseManager().cancelLease(scannerName);
3089        } catch (LeaseException e) {
3090          LOG.warn("Getting exception closing " + scannerName, e);
3091        }
3092      }
3093      throw new NotServingRegionException(msg);
3094    }
3095    return rsh;
3096  }
3097
3098  /**
3099   * @return Pair with scannerName key to use with this new Scanner and its RegionScannerHolder
3100   *         value.
3101   */
3102  private Pair<String, RegionScannerHolder> newRegionScanner(ScanRequest request,
3103    ScanResponse.Builder builder) throws IOException {
3104    HRegion region = getRegion(request.getRegion());
3105    rejectIfInStandByState(region);
3106    ClientProtos.Scan protoScan = request.getScan();
3107    boolean isLoadingCfsOnDemandSet = protoScan.hasLoadColumnFamiliesOnDemand();
3108    Scan scan = ProtobufUtil.toScan(protoScan);
3109    // if the request doesn't set this, get the default region setting.
3110    if (!isLoadingCfsOnDemandSet) {
3111      scan.setLoadColumnFamiliesOnDemand(region.isLoadingCfsOnDemandDefault());
3112    }
3113
3114    if (!scan.hasFamilies()) {
3115      // Adding all families to scanner
3116      for (byte[] family : region.getTableDescriptor().getColumnFamilyNames()) {
3117        scan.addFamily(family);
3118      }
3119    }
3120    if (region.getCoprocessorHost() != null) {
3121      // preScannerOpen is not allowed to return a RegionScanner. Only post hook can create a
3122      // wrapper for the core created RegionScanner
3123      region.getCoprocessorHost().preScannerOpen(scan);
3124    }
3125    RegionScannerImpl coreScanner = region.getScanner(scan);
3126    Shipper shipper = coreScanner;
3127    RegionScanner scanner = coreScanner;
3128    try {
3129      if (region.getCoprocessorHost() != null) {
3130        scanner = region.getCoprocessorHost().postScannerOpen(scan, scanner);
3131      }
3132    } catch (Exception e) {
3133      // Although region coprocessor is for advanced users and they should take care of the
3134      // implementation to not damage the HBase system, closing the scanner on exception here does
3135      // not have any bad side effect, so let's do it
3136      scanner.close();
3137      throw e;
3138    }
3139    long scannerId = scannerIdGenerator.generateNewScannerId();
3140    builder.setScannerId(scannerId);
3141    builder.setMvccReadPoint(scanner.getMvccReadPoint());
3142    builder.setTtl(scannerLeaseTimeoutPeriod);
3143    String scannerName = toScannerName(scannerId);
3144
3145    boolean fullRegionScan =
3146      !region.getRegionInfo().getTable().isSystemTable() && isFullRegionScan(scan, region);
3147
3148    return new Pair<String, RegionScannerHolder>(scannerName,
3149      addScanner(scannerName, scanner, shipper, region, scan.isNeedCursorResult(), fullRegionScan));
3150  }
3151
3152  /**
3153   * The returned String is used as key doing look up of outstanding Scanners in this Servers'
3154   * this.scanners, the Map of outstanding scanners and their current state.
3155   * @param scannerId A scanner long id.
3156   * @return The long id as a String.
3157   */
3158  private static String toScannerName(long scannerId) {
3159    return Long.toString(scannerId);
3160  }
3161
3162  private void checkScanNextCallSeq(ScanRequest request, RegionScannerHolder rsh)
3163    throws OutOfOrderScannerNextException {
3164    // if nextCallSeq does not match throw Exception straight away. This needs to be
3165    // performed even before checking of Lease.
3166    // See HBASE-5974
3167    if (request.hasNextCallSeq()) {
3168      long callSeq = request.getNextCallSeq();
3169      if (!rsh.incNextCallSeq(callSeq)) {
3170        throw new OutOfOrderScannerNextException(
3171          "Expected nextCallSeq: " + rsh.getNextCallSeq() + " But the nextCallSeq got from client: "
3172            + request.getNextCallSeq() + "; request=" + TextFormat.shortDebugString(request));
3173      }
3174    }
3175  }
3176
3177  private void addScannerLeaseBack(LeaseManager.Lease lease) {
3178    try {
3179      server.getLeaseManager().addLease(lease);
3180    } catch (LeaseStillHeldException e) {
3181      // should not happen as the scanner id is unique.
3182      throw new AssertionError(e);
3183    }
3184  }
3185
3186  // visible for testing only
3187  long getTimeLimit(RpcCall rpcCall, HBaseRpcController controller,
3188    boolean allowHeartbeatMessages) {
3189    // Set the time limit to be half of the more restrictive timeout value (one of the
3190    // timeout values must be positive). In the event that both values are positive, the
3191    // more restrictive of the two is used to calculate the limit.
3192    if (allowHeartbeatMessages) {
3193      long now = EnvironmentEdgeManager.currentTime();
3194      long remainingTimeout = getRemainingRpcTimeout(rpcCall, controller, now);
3195      if (scannerLeaseTimeoutPeriod > 0 || remainingTimeout > 0) {
3196        long timeLimitDelta;
3197        if (scannerLeaseTimeoutPeriod > 0 && remainingTimeout > 0) {
3198          timeLimitDelta = Math.min(scannerLeaseTimeoutPeriod, remainingTimeout);
3199        } else {
3200          timeLimitDelta =
3201            scannerLeaseTimeoutPeriod > 0 ? scannerLeaseTimeoutPeriod : remainingTimeout;
3202        }
3203
3204        // Use half of whichever timeout value was more restrictive... But don't allow
3205        // the time limit to be less than the allowable minimum (could cause an
3206        // immediate timeout before scanning any data).
3207        timeLimitDelta = Math.max(timeLimitDelta / 2, minimumScanTimeLimitDelta);
3208        return now + timeLimitDelta;
3209      }
3210    }
3211    // Default value of timeLimit is negative to indicate no timeLimit should be
3212    // enforced.
3213    return -1L;
3214  }
3215
3216  private long getRemainingRpcTimeout(RpcCall call, HBaseRpcController controller, long now) {
3217    long timeout;
3218    if (controller != null && controller.getCallTimeout() > 0) {
3219      timeout = controller.getCallTimeout();
3220    } else if (rpcTimeout > 0) {
3221      timeout = rpcTimeout;
3222    } else {
3223      return -1;
3224    }
3225    if (call != null) {
3226      timeout -= (now - call.getReceiveTime());
3227    }
3228    // getTimeLimit ignores values <= 0, but timeout may now be negative if queue time was high.
3229    // return minimum value here in that case so we count this in calculating the final delta.
3230    return Math.max(minimumScanTimeLimitDelta, timeout);
3231  }
3232
3233  private void checkLimitOfRows(int numOfCompleteRows, int limitOfRows, boolean moreRows,
3234    ScannerContext scannerContext, ScanResponse.Builder builder) {
3235    if (numOfCompleteRows >= limitOfRows) {
3236      if (LOG.isTraceEnabled()) {
3237        LOG.trace("Done scanning, limit of rows reached, moreRows: " + moreRows
3238          + " scannerContext: " + scannerContext);
3239      }
3240      builder.setMoreResults(false);
3241    }
3242  }
3243
3244  // return whether we have more results in region.
3245  private void scan(HBaseRpcController controller, ScanRequest request, RegionScannerHolder rsh,
3246    long maxQuotaResultSize, int maxResults, int limitOfRows, List<Result> results,
3247    ScanResponse.Builder builder, RpcCall rpcCall) throws IOException {
3248    HRegion region = rsh.r;
3249    RegionScanner scanner = rsh.s;
3250    long maxResultSize;
3251    if (scanner.getMaxResultSize() > 0) {
3252      maxResultSize = Math.min(scanner.getMaxResultSize(), maxQuotaResultSize);
3253    } else {
3254      maxResultSize = maxQuotaResultSize;
3255    }
3256    // This is cells inside a row. Default size is 10 so if many versions or many cfs,
3257    // then we'll resize. Resizings show in profiler. Set it higher than 10. For now
3258    // arbitrary 32. TODO: keep record of general size of results being returned.
3259    ArrayList<Cell> values = new ArrayList<>(32);
3260    region.startRegionOperation(Operation.SCAN);
3261    long before = EnvironmentEdgeManager.currentTime();
3262    // Used to check if we've matched the row limit set on the Scan
3263    int numOfCompleteRows = 0;
3264    // Count of times we call nextRaw; can be > numOfCompleteRows.
3265    int numOfNextRawCalls = 0;
3266    try {
3267      int numOfResults = 0;
3268      synchronized (scanner) {
3269        boolean stale = (region.getRegionInfo().getReplicaId() != 0);
3270        boolean clientHandlesPartials =
3271          request.hasClientHandlesPartials() && request.getClientHandlesPartials();
3272        boolean clientHandlesHeartbeats =
3273          request.hasClientHandlesHeartbeats() && request.getClientHandlesHeartbeats();
3274
3275        // On the server side we must ensure that the correct ordering of partial results is
3276        // returned to the client to allow them to properly reconstruct the partial results.
3277        // If the coprocessor host is adding to the result list, we cannot guarantee the
3278        // correct ordering of partial results and so we prevent partial results from being
3279        // formed.
3280        boolean serverGuaranteesOrderOfPartials = results.isEmpty();
3281        boolean allowPartialResults = clientHandlesPartials && serverGuaranteesOrderOfPartials;
3282        boolean moreRows = false;
3283
3284        // Heartbeat messages occur when the processing of the ScanRequest is exceeds a
3285        // certain time threshold on the server. When the time threshold is exceeded, the
3286        // server stops the scan and sends back whatever Results it has accumulated within
3287        // that time period (may be empty). Since heartbeat messages have the potential to
3288        // create partial Results (in the event that the timeout occurs in the middle of a
3289        // row), we must only generate heartbeat messages when the client can handle both
3290        // heartbeats AND partials
3291        boolean allowHeartbeatMessages = clientHandlesHeartbeats && allowPartialResults;
3292
3293        long timeLimit = getTimeLimit(rpcCall, controller, allowHeartbeatMessages);
3294
3295        final LimitScope sizeScope =
3296          allowPartialResults ? LimitScope.BETWEEN_CELLS : LimitScope.BETWEEN_ROWS;
3297        final LimitScope timeScope =
3298          allowHeartbeatMessages ? LimitScope.BETWEEN_CELLS : LimitScope.BETWEEN_ROWS;
3299
3300        boolean trackMetrics = request.hasTrackScanMetrics() && request.getTrackScanMetrics();
3301
3302        // Configure with limits for this RPC. Set keep progress true since size progress
3303        // towards size limit should be kept between calls to nextRaw
3304        ScannerContext.Builder contextBuilder = ScannerContext.newBuilder(true);
3305        // maxResultSize - either we can reach this much size for all cells(being read) data or sum
3306        // of heap size occupied by cells(being read). Cell data means its key and value parts.
3307        // maxQuotaResultSize - max results just from server side configuration and quotas, without
3308        // user's specified max. We use this for evaluating limits based on blocks (not cells).
3309        // We may have accumulated some results in coprocessor preScannerNext call. Subtract any
3310        // cell or block size from maximum here so we adhere to total limits of request.
3311        // Note: we track block size in StoreScanner. If the CP hook got cells from hbase, it will
3312        // have accumulated block bytes. If not, this will be 0 for block size.
3313        long maxCellSize = maxResultSize;
3314        long maxBlockSize = maxQuotaResultSize;
3315        if (rpcCall != null) {
3316          maxBlockSize -= rpcCall.getResponseBlockSize();
3317          maxCellSize -= rpcCall.getResponseCellSize();
3318        }
3319
3320        contextBuilder.setSizeLimit(sizeScope, maxCellSize, maxCellSize, maxBlockSize);
3321        contextBuilder.setBatchLimit(scanner.getBatch());
3322        contextBuilder.setTimeLimit(timeScope, timeLimit);
3323        contextBuilder.setTrackMetrics(trackMetrics);
3324        ScannerContext scannerContext = contextBuilder.build();
3325        boolean limitReached = false;
3326        while (numOfResults < maxResults) {
3327          // Reset the batch progress to 0 before every call to RegionScanner#nextRaw. The
3328          // batch limit is a limit on the number of cells per Result. Thus, if progress is
3329          // being tracked (i.e. scannerContext.keepProgress() is true) then we need to
3330          // reset the batch progress between nextRaw invocations since we don't want the
3331          // batch progress from previous calls to affect future calls
3332          scannerContext.setBatchProgress(0);
3333          assert values.isEmpty();
3334
3335          // Collect values to be returned here
3336          moreRows = scanner.nextRaw(values, scannerContext);
3337          if (rpcCall == null) {
3338            // When there is no RpcCallContext,copy EC to heap, then the scanner would close,
3339            // This can be an EXPENSIVE call. It may make an extra copy from offheap to onheap
3340            // buffers.See more details in HBASE-26036.
3341            CellUtil.cloneIfNecessary(values);
3342          }
3343          numOfNextRawCalls++;
3344
3345          if (!values.isEmpty()) {
3346            if (limitOfRows > 0) {
3347              // First we need to check if the last result is partial and we have a row change. If
3348              // so then we need to increase the numOfCompleteRows.
3349              if (results.isEmpty()) {
3350                if (
3351                  rsh.rowOfLastPartialResult != null
3352                    && !CellUtil.matchingRows(values.get(0), rsh.rowOfLastPartialResult)
3353                ) {
3354                  numOfCompleteRows++;
3355                  checkLimitOfRows(numOfCompleteRows, limitOfRows, moreRows, scannerContext,
3356                    builder);
3357                }
3358              } else {
3359                Result lastResult = results.get(results.size() - 1);
3360                if (
3361                  lastResult.mayHaveMoreCellsInRow()
3362                    && !CellUtil.matchingRows(values.get(0), lastResult.getRow())
3363                ) {
3364                  numOfCompleteRows++;
3365                  checkLimitOfRows(numOfCompleteRows, limitOfRows, moreRows, scannerContext,
3366                    builder);
3367                }
3368              }
3369              if (builder.hasMoreResults() && !builder.getMoreResults()) {
3370                break;
3371              }
3372            }
3373            boolean mayHaveMoreCellsInRow = scannerContext.mayHaveMoreCellsInRow();
3374            Result r = Result.create(values, null, stale, mayHaveMoreCellsInRow);
3375            results.add(r);
3376            numOfResults++;
3377            if (!mayHaveMoreCellsInRow && limitOfRows > 0) {
3378              numOfCompleteRows++;
3379              checkLimitOfRows(numOfCompleteRows, limitOfRows, moreRows, scannerContext, builder);
3380              if (builder.hasMoreResults() && !builder.getMoreResults()) {
3381                break;
3382              }
3383            }
3384          } else if (!moreRows && !results.isEmpty()) {
3385            // No more cells for the scan here, we need to ensure that the mayHaveMoreCellsInRow of
3386            // last result is false. Otherwise it's possible that: the first nextRaw returned
3387            // because BATCH_LIMIT_REACHED (BTW it happen to exhaust all cells of the scan),so the
3388            // last result's mayHaveMoreCellsInRow will be true. while the following nextRaw will
3389            // return with moreRows=false, which means moreResultsInRegion would be false, it will
3390            // be a contradictory state (HBASE-21206).
3391            int lastIdx = results.size() - 1;
3392            Result r = results.get(lastIdx);
3393            if (r.mayHaveMoreCellsInRow()) {
3394              results.set(lastIdx, Result.create(r.rawCells(), r.getExists(), r.isStale(), false));
3395            }
3396          }
3397          boolean sizeLimitReached = scannerContext.checkSizeLimit(LimitScope.BETWEEN_ROWS);
3398          boolean timeLimitReached = scannerContext.checkTimeLimit(LimitScope.BETWEEN_ROWS);
3399          boolean resultsLimitReached = numOfResults >= maxResults;
3400          limitReached = sizeLimitReached || timeLimitReached || resultsLimitReached;
3401
3402          if (limitReached || !moreRows) {
3403            // With block size limit, we may exceed size limit without collecting any results.
3404            // In this case we want to send heartbeat and/or cursor. We don't want to send heartbeat
3405            // or cursor if results were collected, for example for cell size or heap size limits.
3406            boolean sizeLimitReachedWithoutResults = sizeLimitReached && results.isEmpty();
3407            // We only want to mark a ScanResponse as a heartbeat message in the event that
3408            // there are more values to be read server side. If there aren't more values,
3409            // marking it as a heartbeat is wasteful because the client will need to issue
3410            // another ScanRequest only to realize that they already have all the values
3411            if (moreRows && (timeLimitReached || sizeLimitReachedWithoutResults)) {
3412              // Heartbeat messages occur when the time limit has been reached, or size limit has
3413              // been reached before collecting any results. This can happen for heavily filtered
3414              // scans which scan over too many blocks.
3415              builder.setHeartbeatMessage(true);
3416              if (rsh.needCursor) {
3417                Cell cursorCell = scannerContext.getLastPeekedCell();
3418                if (cursorCell != null) {
3419                  builder.setCursor(ProtobufUtil.toCursor(cursorCell));
3420                }
3421              }
3422            }
3423            break;
3424          }
3425          values.clear();
3426        }
3427        if (rpcCall != null) {
3428          rpcCall.incrementResponseCellSize(scannerContext.getHeapSizeProgress());
3429        }
3430        builder.setMoreResultsInRegion(moreRows);
3431        // Check to see if the client requested that we track metrics server side. If the
3432        // client requested metrics, retrieve the metrics from the scanner context.
3433        if (trackMetrics) {
3434          Map<String, Long> metrics = scannerContext.getMetrics().getMetricsMap();
3435          ScanMetrics.Builder metricBuilder = ScanMetrics.newBuilder();
3436          NameInt64Pair.Builder pairBuilder = NameInt64Pair.newBuilder();
3437
3438          for (Entry<String, Long> entry : metrics.entrySet()) {
3439            pairBuilder.setName(entry.getKey());
3440            pairBuilder.setValue(entry.getValue());
3441            metricBuilder.addMetrics(pairBuilder.build());
3442          }
3443
3444          builder.setScanMetrics(metricBuilder.build());
3445        }
3446      }
3447    } finally {
3448      region.closeRegionOperation();
3449      // Update serverside metrics, even on error.
3450      long end = EnvironmentEdgeManager.currentTime();
3451      long responseCellSize = rpcCall != null ? rpcCall.getResponseCellSize() : 0;
3452      region.getMetrics().updateScanTime(end - before);
3453      final MetricsRegionServer metricsRegionServer = server.getMetrics();
3454      if (metricsRegionServer != null) {
3455        metricsRegionServer.updateScanSize(region, responseCellSize);
3456        metricsRegionServer.updateScanTime(region, end - before);
3457        metricsRegionServer.updateReadQueryMeter(region, numOfNextRawCalls);
3458      }
3459    }
3460    // coprocessor postNext hook
3461    if (region.getCoprocessorHost() != null) {
3462      region.getCoprocessorHost().postScannerNext(scanner, results, maxResults, true);
3463    }
3464  }
3465
3466  /**
3467   * Scan data in a table.
3468   * @param controller the RPC controller
3469   * @param request    the scan request
3470   */
3471  @Override
3472  public ScanResponse scan(final RpcController controller, final ScanRequest request)
3473    throws ServiceException {
3474    if (controller != null && !(controller instanceof HBaseRpcController)) {
3475      throw new UnsupportedOperationException(
3476        "We only do " + "HBaseRpcControllers! FIX IF A PROBLEM: " + controller);
3477    }
3478    if (!request.hasScannerId() && !request.hasScan()) {
3479      throw new ServiceException(
3480        new DoNotRetryIOException("Missing required input: scannerId or scan"));
3481    }
3482    try {
3483      checkOpen();
3484    } catch (IOException e) {
3485      if (request.hasScannerId()) {
3486        String scannerName = toScannerName(request.getScannerId());
3487        if (LOG.isDebugEnabled()) {
3488          LOG.debug(
3489            "Server shutting down and client tried to access missing scanner " + scannerName);
3490        }
3491        final LeaseManager leaseManager = server.getLeaseManager();
3492        if (leaseManager != null) {
3493          try {
3494            leaseManager.cancelLease(scannerName);
3495          } catch (LeaseException le) {
3496            // No problem, ignore
3497            if (LOG.isTraceEnabled()) {
3498              LOG.trace("Un-able to cancel lease of scanner. It could already be closed.");
3499            }
3500          }
3501        }
3502      }
3503      throw new ServiceException(e);
3504    }
3505    requestCount.increment();
3506    rpcScanRequestCount.increment();
3507    RegionScannerHolder rsh;
3508    ScanResponse.Builder builder = ScanResponse.newBuilder();
3509    String scannerName;
3510    try {
3511      if (request.hasScannerId()) {
3512        // The downstream projects such as AsyncHBase in OpenTSDB need this value. See HBASE-18000
3513        // for more details.
3514        long scannerId = request.getScannerId();
3515        builder.setScannerId(scannerId);
3516        scannerName = toScannerName(scannerId);
3517        rsh = getRegionScanner(request);
3518      } else {
3519        Pair<String, RegionScannerHolder> scannerNameAndRSH = newRegionScanner(request, builder);
3520        scannerName = scannerNameAndRSH.getFirst();
3521        rsh = scannerNameAndRSH.getSecond();
3522      }
3523    } catch (IOException e) {
3524      if (e == SCANNER_ALREADY_CLOSED) {
3525        // Now we will close scanner automatically if there are no more results for this region but
3526        // the old client will still send a close request to us. Just ignore it and return.
3527        return builder.build();
3528      }
3529      throw new ServiceException(e);
3530    }
3531    if (rsh.fullRegionScan) {
3532      rpcFullScanRequestCount.increment();
3533    }
3534    HRegion region = rsh.r;
3535    LeaseManager.Lease lease;
3536    try {
3537      // Remove lease while its being processed in server; protects against case
3538      // where processing of request takes > lease expiration time. or null if none found.
3539      lease = server.getLeaseManager().removeLease(scannerName);
3540    } catch (LeaseException e) {
3541      throw new ServiceException(e);
3542    }
3543    if (request.hasRenew() && request.getRenew()) {
3544      // add back and return
3545      addScannerLeaseBack(lease);
3546      try {
3547        checkScanNextCallSeq(request, rsh);
3548      } catch (OutOfOrderScannerNextException e) {
3549        throw new ServiceException(e);
3550      }
3551      return builder.build();
3552    }
3553    OperationQuota quota;
3554    try {
3555      quota = getRpcQuotaManager().checkQuota(region, OperationQuota.OperationType.SCAN);
3556    } catch (IOException e) {
3557      addScannerLeaseBack(lease);
3558      throw new ServiceException(e);
3559    }
3560    try {
3561      checkScanNextCallSeq(request, rsh);
3562    } catch (OutOfOrderScannerNextException e) {
3563      addScannerLeaseBack(lease);
3564      throw new ServiceException(e);
3565    }
3566    // Now we have increased the next call sequence. If we give client an error, the retry will
3567    // never success. So we'd better close the scanner and return a DoNotRetryIOException to client
3568    // and then client will try to open a new scanner.
3569    boolean closeScanner = request.hasCloseScanner() ? request.getCloseScanner() : false;
3570    int rows; // this is scan.getCaching
3571    if (request.hasNumberOfRows()) {
3572      rows = request.getNumberOfRows();
3573    } else {
3574      rows = closeScanner ? 0 : 1;
3575    }
3576    RpcCall rpcCall = RpcServer.getCurrentCall().orElse(null);
3577    // now let's do the real scan.
3578    long maxQuotaResultSize = Math.min(maxScannerResultSize, quota.getReadAvailable());
3579    RegionScanner scanner = rsh.s;
3580    // this is the limit of rows for this scan, if we the number of rows reach this value, we will
3581    // close the scanner.
3582    int limitOfRows;
3583    if (request.hasLimitOfRows()) {
3584      limitOfRows = request.getLimitOfRows();
3585    } else {
3586      limitOfRows = -1;
3587    }
3588    boolean scannerClosed = false;
3589    try {
3590      List<Result> results = new ArrayList<>(Math.min(rows, 512));
3591      if (rows > 0) {
3592        boolean done = false;
3593        // Call coprocessor. Get region info from scanner.
3594        if (region.getCoprocessorHost() != null) {
3595          Boolean bypass = region.getCoprocessorHost().preScannerNext(scanner, results, rows);
3596          if (!results.isEmpty()) {
3597            for (Result r : results) {
3598              // add cell size from CP results so we can track response size and update limits
3599              // when calling scan below if !done. We'll also have tracked block size if the CP
3600              // got results from hbase, since StoreScanner tracks that for all calls automatically.
3601              addSize(rpcCall, r);
3602            }
3603          }
3604          if (bypass != null && bypass.booleanValue()) {
3605            done = true;
3606          }
3607        }
3608        if (!done) {
3609          scan((HBaseRpcController) controller, request, rsh, maxQuotaResultSize, rows, limitOfRows,
3610            results, builder, rpcCall);
3611        } else {
3612          builder.setMoreResultsInRegion(!results.isEmpty());
3613        }
3614      } else {
3615        // This is a open scanner call with numberOfRow = 0, so set more results in region to true.
3616        builder.setMoreResultsInRegion(true);
3617      }
3618
3619      quota.addScanResult(results);
3620      addResults(builder, results, (HBaseRpcController) controller,
3621        RegionReplicaUtil.isDefaultReplica(region.getRegionInfo()),
3622        isClientCellBlockSupport(rpcCall));
3623      if (scanner.isFilterDone() && results.isEmpty()) {
3624        // If the scanner's filter - if any - is done with the scan
3625        // only set moreResults to false if the results is empty. This is used to keep compatible
3626        // with the old scan implementation where we just ignore the returned results if moreResults
3627        // is false. Can remove the isEmpty check after we get rid of the old implementation.
3628        builder.setMoreResults(false);
3629      }
3630      // Later we may close the scanner depending on this flag so here we need to make sure that we
3631      // have already set this flag.
3632      assert builder.hasMoreResultsInRegion();
3633      // we only set moreResults to false in the above code, so set it to true if we haven't set it
3634      // yet.
3635      if (!builder.hasMoreResults()) {
3636        builder.setMoreResults(true);
3637      }
3638      if (builder.getMoreResults() && builder.getMoreResultsInRegion() && !results.isEmpty()) {
3639        // Record the last cell of the last result if it is a partial result
3640        // We need this to calculate the complete rows we have returned to client as the
3641        // mayHaveMoreCellsInRow is true does not mean that there will be extra cells for the
3642        // current row. We may filter out all the remaining cells for the current row and just
3643        // return the cells of the nextRow when calling RegionScanner.nextRaw. So here we need to
3644        // check for row change.
3645        Result lastResult = results.get(results.size() - 1);
3646        if (lastResult.mayHaveMoreCellsInRow()) {
3647          rsh.rowOfLastPartialResult = lastResult.getRow();
3648        } else {
3649          rsh.rowOfLastPartialResult = null;
3650        }
3651      }
3652      if (!builder.getMoreResults() || !builder.getMoreResultsInRegion() || closeScanner) {
3653        scannerClosed = true;
3654        closeScanner(region, scanner, scannerName, rpcCall);
3655      }
3656
3657      // There's no point returning to a timed out client. Throwing ensures scanner is closed
3658      if (rpcCall != null && EnvironmentEdgeManager.currentTime() > rpcCall.getDeadline()) {
3659        throw new TimeoutIOException("Client deadline exceeded, cannot return results");
3660      }
3661
3662      return builder.build();
3663    } catch (IOException e) {
3664      try {
3665        // scanner is closed here
3666        scannerClosed = true;
3667        // The scanner state might be left in a dirty state, so we will tell the Client to
3668        // fail this RPC and close the scanner while opening up another one from the start of
3669        // row that the client has last seen.
3670        closeScanner(region, scanner, scannerName, rpcCall);
3671
3672        // If it is a DoNotRetryIOException already, throw as it is. Unfortunately, DNRIOE is
3673        // used in two different semantics.
3674        // (1) The first is to close the client scanner and bubble up the exception all the way
3675        // to the application. This is preferred when the exception is really un-recoverable
3676        // (like CorruptHFileException, etc). Plain DoNotRetryIOException also falls into this
3677        // bucket usually.
3678        // (2) Second semantics is to close the current region scanner only, but continue the
3679        // client scanner by overriding the exception. This is usually UnknownScannerException,
3680        // OutOfOrderScannerNextException, etc where the region scanner has to be closed, but the
3681        // application-level ClientScanner has to continue without bubbling up the exception to
3682        // the client. See ClientScanner code to see how it deals with these special exceptions.
3683        if (e instanceof DoNotRetryIOException) {
3684          throw e;
3685        }
3686
3687        // If it is a FileNotFoundException, wrap as a
3688        // DoNotRetryIOException. This can avoid the retry in ClientScanner.
3689        if (e instanceof FileNotFoundException) {
3690          throw new DoNotRetryIOException(e);
3691        }
3692
3693        // We closed the scanner already. Instead of throwing the IOException, and client
3694        // retrying with the same scannerId only to get USE on the next RPC, we directly throw
3695        // a special exception to save an RPC.
3696        if (VersionInfoUtil.hasMinimumVersion(rpcCall.getClientVersionInfo(), 1, 4)) {
3697          // 1.4.0+ clients know how to handle
3698          throw new ScannerResetException("Scanner is closed on the server-side", e);
3699        } else {
3700          // older clients do not know about SRE. Just throw USE, which they will handle
3701          throw new UnknownScannerException("Throwing UnknownScannerException to reset the client"
3702            + " scanner state for clients older than 1.3.", e);
3703        }
3704      } catch (IOException ioe) {
3705        throw new ServiceException(ioe);
3706      }
3707    } finally {
3708      if (!scannerClosed) {
3709        // Adding resets expiration time on lease.
3710        // the closeCallBack will be set in closeScanner so here we only care about shippedCallback
3711        if (rpcCall != null) {
3712          rpcCall.setCallBack(rsh.shippedCallback);
3713        } else {
3714          // If context is null,here we call rsh.shippedCallback directly to reuse the logic in
3715          // rsh.shippedCallback to release the internal resources in rsh,and lease is also added
3716          // back to regionserver's LeaseManager in rsh.shippedCallback.
3717          runShippedCallback(rsh);
3718        }
3719      }
3720      quota.close();
3721    }
3722  }
3723
3724  private void runShippedCallback(RegionScannerHolder rsh) throws ServiceException {
3725    assert rsh.shippedCallback != null;
3726    try {
3727      rsh.shippedCallback.run();
3728    } catch (IOException ioe) {
3729      throw new ServiceException(ioe);
3730    }
3731  }
3732
3733  private void closeScanner(HRegion region, RegionScanner scanner, String scannerName,
3734    RpcCallContext context) throws IOException {
3735    if (region.getCoprocessorHost() != null) {
3736      if (region.getCoprocessorHost().preScannerClose(scanner)) {
3737        // bypass the actual close.
3738        return;
3739      }
3740    }
3741    RegionScannerHolder rsh = scanners.remove(scannerName);
3742    if (rsh != null) {
3743      if (context != null) {
3744        context.setCallBack(rsh.closeCallBack);
3745      } else {
3746        rsh.s.close();
3747      }
3748      if (region.getCoprocessorHost() != null) {
3749        region.getCoprocessorHost().postScannerClose(scanner);
3750      }
3751      closedScanners.put(scannerName, scannerName);
3752    }
3753  }
3754
3755  @Override
3756  public CoprocessorServiceResponse execRegionServerService(RpcController controller,
3757    CoprocessorServiceRequest request) throws ServiceException {
3758    rpcPreCheck("execRegionServerService");
3759    return server.execRegionServerService(controller, request);
3760  }
3761
3762  @Override
3763  public GetSpaceQuotaSnapshotsResponse getSpaceQuotaSnapshots(RpcController controller,
3764    GetSpaceQuotaSnapshotsRequest request) throws ServiceException {
3765    try {
3766      final RegionServerSpaceQuotaManager manager = server.getRegionServerSpaceQuotaManager();
3767      final GetSpaceQuotaSnapshotsResponse.Builder builder =
3768        GetSpaceQuotaSnapshotsResponse.newBuilder();
3769      if (manager != null) {
3770        final Map<TableName, SpaceQuotaSnapshot> snapshots = manager.copyQuotaSnapshots();
3771        for (Entry<TableName, SpaceQuotaSnapshot> snapshot : snapshots.entrySet()) {
3772          builder.addSnapshots(TableQuotaSnapshot.newBuilder()
3773            .setTableName(ProtobufUtil.toProtoTableName(snapshot.getKey()))
3774            .setSnapshot(SpaceQuotaSnapshot.toProtoSnapshot(snapshot.getValue())).build());
3775        }
3776      }
3777      return builder.build();
3778    } catch (Exception e) {
3779      throw new ServiceException(e);
3780    }
3781  }
3782
3783  @Override
3784  public ClearRegionBlockCacheResponse clearRegionBlockCache(RpcController controller,
3785    ClearRegionBlockCacheRequest request) throws ServiceException {
3786    rpcPreCheck("clearRegionBlockCache");
3787    ClearRegionBlockCacheResponse.Builder builder = ClearRegionBlockCacheResponse.newBuilder();
3788    CacheEvictionStatsBuilder stats = CacheEvictionStats.builder();
3789    List<HRegion> regions = getRegions(request.getRegionList(), stats);
3790    for (HRegion region : regions) {
3791      try {
3792        stats = stats.append(this.server.clearRegionBlockCache(region));
3793      } catch (Exception e) {
3794        stats.addException(region.getRegionInfo().getRegionName(), e);
3795      }
3796    }
3797    stats.withMaxCacheSize(server.getBlockCache().map(BlockCache::getMaxSize).orElse(0L));
3798    return builder.setStats(ProtobufUtil.toCacheEvictionStats(stats.build())).build();
3799  }
3800
3801  private void executeOpenRegionProcedures(OpenRegionRequest request,
3802    Map<TableName, TableDescriptor> tdCache) {
3803    long masterSystemTime = request.hasMasterSystemTime() ? request.getMasterSystemTime() : -1;
3804    for (RegionOpenInfo regionOpenInfo : request.getOpenInfoList()) {
3805      RegionInfo regionInfo = ProtobufUtil.toRegionInfo(regionOpenInfo.getRegion());
3806      TableName tableName = regionInfo.getTable();
3807      TableDescriptor tableDesc = tdCache.get(tableName);
3808      if (tableDesc == null) {
3809        try {
3810          tableDesc = server.getTableDescriptors().get(regionInfo.getTable());
3811        } catch (IOException e) {
3812          // Here we do not fail the whole method since we also need deal with other
3813          // procedures, and we can not ignore this one, so we still schedule a
3814          // AssignRegionHandler and it will report back to master if we still can not get the
3815          // TableDescriptor.
3816          LOG.warn("Failed to get TableDescriptor of {}, will try again in the handler",
3817            regionInfo.getTable(), e);
3818        }
3819        if (tableDesc != null) {
3820          tdCache.put(tableName, tableDesc);
3821        }
3822      }
3823      if (regionOpenInfo.getFavoredNodesCount() > 0) {
3824        server.updateRegionFavoredNodesMapping(regionInfo.getEncodedName(),
3825          regionOpenInfo.getFavoredNodesList());
3826      }
3827      long procId = regionOpenInfo.getOpenProcId();
3828      if (server.submitRegionProcedure(procId)) {
3829        server.getExecutorService().submit(
3830          AssignRegionHandler.create(server, regionInfo, procId, tableDesc, masterSystemTime));
3831      }
3832    }
3833  }
3834
3835  private void executeCloseRegionProcedures(CloseRegionRequest request) {
3836    String encodedName;
3837    try {
3838      encodedName = ProtobufUtil.getRegionEncodedName(request.getRegion());
3839    } catch (DoNotRetryIOException e) {
3840      throw new UncheckedIOException("Should not happen", e);
3841    }
3842    ServerName destination = request.hasDestinationServer()
3843      ? ProtobufUtil.toServerName(request.getDestinationServer())
3844      : null;
3845    long procId = request.getCloseProcId();
3846    boolean evictCache = request.getEvictCache();
3847    if (server.submitRegionProcedure(procId)) {
3848      server.getExecutorService().submit(
3849        UnassignRegionHandler.create(server, encodedName, procId, false, destination, evictCache));
3850    }
3851  }
3852
3853  private void executeProcedures(RemoteProcedureRequest request) {
3854    RSProcedureCallable callable;
3855    try {
3856      callable = Class.forName(request.getProcClass()).asSubclass(RSProcedureCallable.class)
3857        .getDeclaredConstructor().newInstance();
3858    } catch (Exception e) {
3859      LOG.warn("Failed to instantiating remote procedure {}, pid={}", request.getProcClass(),
3860        request.getProcId(), e);
3861      server.remoteProcedureComplete(request.getProcId(), e);
3862      return;
3863    }
3864    callable.init(request.getProcData().toByteArray(), server);
3865    LOG.debug("Executing remote procedure {}, pid={}", callable.getClass(), request.getProcId());
3866    server.executeProcedure(request.getProcId(), callable);
3867  }
3868
3869  @Override
3870  @QosPriority(priority = HConstants.ADMIN_QOS)
3871  public ExecuteProceduresResponse executeProcedures(RpcController controller,
3872    ExecuteProceduresRequest request) throws ServiceException {
3873    try {
3874      checkOpen();
3875      throwOnWrongStartCode(request);
3876      server.getRegionServerCoprocessorHost().preExecuteProcedures();
3877      if (request.getOpenRegionCount() > 0) {
3878        // Avoid reading from the TableDescritor every time(usually it will read from the file
3879        // system)
3880        Map<TableName, TableDescriptor> tdCache = new HashMap<>();
3881        request.getOpenRegionList().forEach(req -> executeOpenRegionProcedures(req, tdCache));
3882      }
3883      if (request.getCloseRegionCount() > 0) {
3884        request.getCloseRegionList().forEach(this::executeCloseRegionProcedures);
3885      }
3886      if (request.getProcCount() > 0) {
3887        request.getProcList().forEach(this::executeProcedures);
3888      }
3889      server.getRegionServerCoprocessorHost().postExecuteProcedures();
3890      return ExecuteProceduresResponse.getDefaultInstance();
3891    } catch (IOException e) {
3892      throw new ServiceException(e);
3893    }
3894  }
3895
3896  @Override
3897  public GetAllBootstrapNodesResponse getAllBootstrapNodes(RpcController controller,
3898    GetAllBootstrapNodesRequest request) throws ServiceException {
3899    GetAllBootstrapNodesResponse.Builder builder = GetAllBootstrapNodesResponse.newBuilder();
3900    server.getBootstrapNodes()
3901      .forEachRemaining(server -> builder.addNode(ProtobufUtil.toServerName(server)));
3902    return builder.build();
3903  }
3904}