001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import com.google.errorprone.annotations.RestrictedApi;
021import java.io.FileNotFoundException;
022import java.io.IOException;
023import java.io.UncheckedIOException;
024import java.net.BindException;
025import java.net.InetAddress;
026import java.net.InetSocketAddress;
027import java.util.ArrayList;
028import java.util.Arrays;
029import java.util.Collections;
030import java.util.HashMap;
031import java.util.Iterator;
032import java.util.List;
033import java.util.Map;
034import java.util.Map.Entry;
035import java.util.NavigableMap;
036import java.util.Set;
037import java.util.TreeSet;
038import java.util.concurrent.ConcurrentHashMap;
039import java.util.concurrent.ConcurrentMap;
040import java.util.concurrent.TimeUnit;
041import java.util.concurrent.atomic.AtomicBoolean;
042import java.util.concurrent.atomic.AtomicLong;
043import java.util.concurrent.atomic.LongAdder;
044import org.apache.hadoop.conf.Configuration;
045import org.apache.hadoop.fs.FileSystem;
046import org.apache.hadoop.fs.Path;
047import org.apache.hadoop.hbase.CacheEvictionStats;
048import org.apache.hadoop.hbase.CacheEvictionStatsBuilder;
049import org.apache.hadoop.hbase.Cell;
050import org.apache.hadoop.hbase.CellScannable;
051import org.apache.hadoop.hbase.CellScanner;
052import org.apache.hadoop.hbase.CellUtil;
053import org.apache.hadoop.hbase.DoNotRetryIOException;
054import org.apache.hadoop.hbase.DroppedSnapshotException;
055import org.apache.hadoop.hbase.HBaseIOException;
056import org.apache.hadoop.hbase.HBaseRpcServicesBase;
057import org.apache.hadoop.hbase.HConstants;
058import org.apache.hadoop.hbase.MultiActionResultTooLarge;
059import org.apache.hadoop.hbase.NotServingRegionException;
060import org.apache.hadoop.hbase.PrivateCellUtil;
061import org.apache.hadoop.hbase.RegionTooBusyException;
062import org.apache.hadoop.hbase.Server;
063import org.apache.hadoop.hbase.ServerName;
064import org.apache.hadoop.hbase.TableName;
065import org.apache.hadoop.hbase.UnknownScannerException;
066import org.apache.hadoop.hbase.client.Append;
067import org.apache.hadoop.hbase.client.CheckAndMutate;
068import org.apache.hadoop.hbase.client.CheckAndMutateResult;
069import org.apache.hadoop.hbase.client.Delete;
070import org.apache.hadoop.hbase.client.Durability;
071import org.apache.hadoop.hbase.client.Get;
072import org.apache.hadoop.hbase.client.Increment;
073import org.apache.hadoop.hbase.client.Mutation;
074import org.apache.hadoop.hbase.client.OperationWithAttributes;
075import org.apache.hadoop.hbase.client.Put;
076import org.apache.hadoop.hbase.client.RegionInfo;
077import org.apache.hadoop.hbase.client.RegionReplicaUtil;
078import org.apache.hadoop.hbase.client.Result;
079import org.apache.hadoop.hbase.client.Row;
080import org.apache.hadoop.hbase.client.Scan;
081import org.apache.hadoop.hbase.client.TableDescriptor;
082import org.apache.hadoop.hbase.client.VersionInfoUtil;
083import org.apache.hadoop.hbase.exceptions.FailedSanityCheckException;
084import org.apache.hadoop.hbase.exceptions.OutOfOrderScannerNextException;
085import org.apache.hadoop.hbase.exceptions.ScannerResetException;
086import org.apache.hadoop.hbase.exceptions.TimeoutIOException;
087import org.apache.hadoop.hbase.exceptions.UnknownProtocolException;
088import org.apache.hadoop.hbase.io.ByteBuffAllocator;
089import org.apache.hadoop.hbase.io.hfile.BlockCache;
090import org.apache.hadoop.hbase.ipc.HBaseRpcController;
091import org.apache.hadoop.hbase.ipc.PriorityFunction;
092import org.apache.hadoop.hbase.ipc.QosPriority;
093import org.apache.hadoop.hbase.ipc.RpcCall;
094import org.apache.hadoop.hbase.ipc.RpcCallContext;
095import org.apache.hadoop.hbase.ipc.RpcCallback;
096import org.apache.hadoop.hbase.ipc.RpcServer;
097import org.apache.hadoop.hbase.ipc.RpcServer.BlockingServiceAndInterface;
098import org.apache.hadoop.hbase.ipc.RpcServerFactory;
099import org.apache.hadoop.hbase.ipc.RpcServerInterface;
100import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
101import org.apache.hadoop.hbase.ipc.ServerRpcController;
102import org.apache.hadoop.hbase.net.Address;
103import org.apache.hadoop.hbase.procedure2.RSProcedureCallable;
104import org.apache.hadoop.hbase.quotas.ActivePolicyEnforcement;
105import org.apache.hadoop.hbase.quotas.OperationQuota;
106import org.apache.hadoop.hbase.quotas.QuotaUtil;
107import org.apache.hadoop.hbase.quotas.RegionServerRpcQuotaManager;
108import org.apache.hadoop.hbase.quotas.RegionServerSpaceQuotaManager;
109import org.apache.hadoop.hbase.quotas.SpaceQuotaSnapshot;
110import org.apache.hadoop.hbase.quotas.SpaceViolationPolicyEnforcement;
111import org.apache.hadoop.hbase.regionserver.LeaseManager.Lease;
112import org.apache.hadoop.hbase.regionserver.LeaseManager.LeaseStillHeldException;
113import org.apache.hadoop.hbase.regionserver.Region.Operation;
114import org.apache.hadoop.hbase.regionserver.ScannerContext.LimitScope;
115import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
116import org.apache.hadoop.hbase.regionserver.handler.AssignRegionHandler;
117import org.apache.hadoop.hbase.regionserver.handler.OpenMetaHandler;
118import org.apache.hadoop.hbase.regionserver.handler.OpenPriorityRegionHandler;
119import org.apache.hadoop.hbase.regionserver.handler.OpenRegionHandler;
120import org.apache.hadoop.hbase.regionserver.handler.UnassignRegionHandler;
121import org.apache.hadoop.hbase.replication.ReplicationUtils;
122import org.apache.hadoop.hbase.replication.regionserver.RejectReplicationRequestStateChecker;
123import org.apache.hadoop.hbase.replication.regionserver.RejectRequestsFromClientStateChecker;
124import org.apache.hadoop.hbase.security.Superusers;
125import org.apache.hadoop.hbase.security.access.Permission;
126import org.apache.hadoop.hbase.util.Bytes;
127import org.apache.hadoop.hbase.util.DNS;
128import org.apache.hadoop.hbase.util.DNS.ServerType;
129import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
130import org.apache.hadoop.hbase.util.Pair;
131import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil;
132import org.apache.hadoop.hbase.wal.WAL;
133import org.apache.hadoop.hbase.wal.WALEdit;
134import org.apache.hadoop.hbase.wal.WALKey;
135import org.apache.hadoop.hbase.wal.WALSplitUtil;
136import org.apache.hadoop.hbase.wal.WALSplitUtil.MutationReplay;
137import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
138import org.apache.yetus.audience.InterfaceAudience;
139import org.slf4j.Logger;
140import org.slf4j.LoggerFactory;
141
142import org.apache.hbase.thirdparty.com.google.common.cache.Cache;
143import org.apache.hbase.thirdparty.com.google.common.cache.CacheBuilder;
144import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList;
145import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
146import org.apache.hbase.thirdparty.com.google.protobuf.ByteString;
147import org.apache.hbase.thirdparty.com.google.protobuf.Message;
148import org.apache.hbase.thirdparty.com.google.protobuf.RpcController;
149import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
150import org.apache.hbase.thirdparty.com.google.protobuf.TextFormat;
151import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations;
152import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils;
153
154import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
155import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
156import org.apache.hadoop.hbase.shaded.protobuf.ResponseConverter;
157import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService;
158import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearCompactionQueuesRequest;
159import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearCompactionQueuesResponse;
160import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearRegionBlockCacheRequest;
161import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearRegionBlockCacheResponse;
162import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CloseRegionRequest;
163import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CloseRegionResponse;
164import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactRegionRequest;
165import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactRegionResponse;
166import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactionSwitchRequest;
167import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactionSwitchResponse;
168import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ExecuteProceduresRequest;
169import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ExecuteProceduresResponse;
170import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionRequest;
171import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionResponse;
172import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetOnlineRegionRequest;
173import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetOnlineRegionResponse;
174import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoRequest;
175import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoResponse;
176import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionLoadRequest;
177import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionLoadResponse;
178import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetServerInfoRequest;
179import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetServerInfoResponse;
180import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetStoreFileRequest;
181import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetStoreFileResponse;
182import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionRequest;
183import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionRequest.RegionOpenInfo;
184import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionResponse;
185import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionResponse.RegionOpeningState;
186import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.RemoteProcedureRequest;
187import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ReplicateWALEntryRequest;
188import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ReplicateWALEntryResponse;
189import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.RollWALWriterRequest;
190import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.RollWALWriterResponse;
191import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.StopServerRequest;
192import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.StopServerResponse;
193import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateFavoredNodesRequest;
194import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateFavoredNodesResponse;
195import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WALEntry;
196import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WarmupRegionRequest;
197import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WarmupRegionResponse;
198import org.apache.hadoop.hbase.shaded.protobuf.generated.BootstrapNodeProtos.BootstrapNodeService;
199import org.apache.hadoop.hbase.shaded.protobuf.generated.BootstrapNodeProtos.GetAllBootstrapNodesRequest;
200import org.apache.hadoop.hbase.shaded.protobuf.generated.BootstrapNodeProtos.GetAllBootstrapNodesResponse;
201import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
202import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.Action;
203import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.BulkLoadHFileRequest;
204import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.BulkLoadHFileRequest.FamilyPath;
205import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.BulkLoadHFileResponse;
206import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CleanupBulkLoadRequest;
207import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CleanupBulkLoadResponse;
208import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService;
209import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.Condition;
210import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceRequest;
211import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceResponse;
212import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.GetRequest;
213import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.GetResponse;
214import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MultiRegionLoadStats;
215import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MultiRequest;
216import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MultiResponse;
217import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateRequest;
218import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateResponse;
219import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutationProto;
220import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutationProto.MutationType;
221import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.PrepareBulkLoadRequest;
222import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.PrepareBulkLoadResponse;
223import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.RegionAction;
224import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.RegionActionResult;
225import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ResultOrException;
226import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanRequest;
227import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanResponse;
228import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos;
229import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos.RegionLoad;
230import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.NameBytesPair;
231import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.NameInt64Pair;
232import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier;
233import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType;
234import org.apache.hadoop.hbase.shaded.protobuf.generated.MapReduceProtos.ScanMetrics;
235import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsRequest;
236import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsResponse;
237import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsResponse.TableQuotaSnapshot;
238import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.ClientMetaService;
239import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.BulkLoadDescriptor;
240import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.CompactionDescriptor;
241import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.FlushDescriptor;
242import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.RegionEventDescriptor;
243
244/**
245 * Implements the regionserver RPC services.
246 */
247@InterfaceAudience.Private
248public class RSRpcServices extends HBaseRpcServicesBase<HRegionServer>
249  implements ClientService.BlockingInterface, BootstrapNodeService.BlockingInterface {
250
251  private static final Logger LOG = LoggerFactory.getLogger(RSRpcServices.class);
252
253  /** RPC scheduler to use for the region server. */
254  public static final String REGION_SERVER_RPC_SCHEDULER_FACTORY_CLASS =
255    "hbase.region.server.rpc.scheduler.factory.class";
256
257  /**
258   * Minimum allowable time limit delta (in milliseconds) that can be enforced during scans. This
259   * configuration exists to prevent the scenario where a time limit is specified to be so
260   * restrictive that the time limit is reached immediately (before any cells are scanned).
261   */
262  private static final String REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA =
263    "hbase.region.server.rpc.minimum.scan.time.limit.delta";
264  /**
265   * Default value of {@link RSRpcServices#REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA}
266   */
267  static final long DEFAULT_REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA = 10;
268
269  /**
270   * Whether to reject rows with size > threshold defined by
271   * {@link HConstants#BATCH_ROWS_THRESHOLD_NAME}
272   */
273  private static final String REJECT_BATCH_ROWS_OVER_THRESHOLD =
274    "hbase.rpc.rows.size.threshold.reject";
275
276  /**
277   * Default value of config {@link RSRpcServices#REJECT_BATCH_ROWS_OVER_THRESHOLD}
278   */
279  private static final boolean DEFAULT_REJECT_BATCH_ROWS_OVER_THRESHOLD = false;
280
281  // Request counter. (Includes requests that are not serviced by regions.)
282  // Count only once for requests with multiple actions like multi/caching-scan/replayBatch
283  final LongAdder requestCount = new LongAdder();
284
285  // Request counter for rpc get
286  final LongAdder rpcGetRequestCount = new LongAdder();
287
288  // Request counter for rpc scan
289  final LongAdder rpcScanRequestCount = new LongAdder();
290
291  // Request counter for scans that might end up in full scans
292  final LongAdder rpcFullScanRequestCount = new LongAdder();
293
294  // Request counter for rpc multi
295  final LongAdder rpcMultiRequestCount = new LongAdder();
296
297  // Request counter for rpc mutate
298  final LongAdder rpcMutateRequestCount = new LongAdder();
299
300  private final long maxScannerResultSize;
301
302  private ScannerIdGenerator scannerIdGenerator;
303  private final ConcurrentMap<String, RegionScannerHolder> scanners = new ConcurrentHashMap<>();
304  // Hold the name of a closed scanner for a while. This is used to keep compatible for old clients
305  // which may send next or close request to a region scanner which has already been exhausted. The
306  // entries will be removed automatically after scannerLeaseTimeoutPeriod.
307  private final Cache<String, String> closedScanners;
308  /**
309   * The lease timeout period for client scanners (milliseconds).
310   */
311  private final int scannerLeaseTimeoutPeriod;
312
313  /**
314   * The RPC timeout period (milliseconds)
315   */
316  private final int rpcTimeout;
317
318  /**
319   * The minimum allowable delta to use for the scan limit
320   */
321  private final long minimumScanTimeLimitDelta;
322
323  /**
324   * Row size threshold for multi requests above which a warning is logged
325   */
326  private final int rowSizeWarnThreshold;
327  /*
328   * Whether we should reject requests with very high no of rows i.e. beyond threshold defined by
329   * rowSizeWarnThreshold
330   */
331  private final boolean rejectRowsWithSizeOverThreshold;
332
333  final AtomicBoolean clearCompactionQueues = new AtomicBoolean(false);
334
335  /**
336   * Services launched in RSRpcServices. By default they are on but you can use the below booleans
337   * to selectively enable/disable either Admin or Client Service (Rare is the case where you would
338   * ever turn off one or the other).
339   */
340  public static final String REGIONSERVER_ADMIN_SERVICE_CONFIG =
341    "hbase.regionserver.admin.executorService";
342  public static final String REGIONSERVER_CLIENT_SERVICE_CONFIG =
343    "hbase.regionserver.client.executorService";
344  public static final String REGIONSERVER_CLIENT_META_SERVICE_CONFIG =
345    "hbase.regionserver.client.meta.executorService";
346
347  /**
348   * An Rpc callback for closing a RegionScanner.
349   */
350  private static final class RegionScannerCloseCallBack implements RpcCallback {
351
352    private final RegionScanner scanner;
353
354    public RegionScannerCloseCallBack(RegionScanner scanner) {
355      this.scanner = scanner;
356    }
357
358    @Override
359    public void run() throws IOException {
360      this.scanner.close();
361    }
362  }
363
364  /**
365   * An Rpc callback for doing shipped() call on a RegionScanner.
366   */
367  private class RegionScannerShippedCallBack implements RpcCallback {
368    private final String scannerName;
369    private final Shipper shipper;
370    private final Lease lease;
371
372    public RegionScannerShippedCallBack(String scannerName, Shipper shipper, Lease lease) {
373      this.scannerName = scannerName;
374      this.shipper = shipper;
375      this.lease = lease;
376    }
377
378    @Override
379    public void run() throws IOException {
380      this.shipper.shipped();
381      // We're done. On way out re-add the above removed lease. The lease was temp removed for this
382      // Rpc call and we are at end of the call now. Time to add it back.
383      if (scanners.containsKey(scannerName)) {
384        if (lease != null) {
385          server.getLeaseManager().addLease(lease);
386        }
387      }
388    }
389  }
390
391  /**
392   * An RpcCallBack that creates a list of scanners that needs to perform callBack operation on
393   * completion of multiGets.
394   */
395  static class RegionScannersCloseCallBack implements RpcCallback {
396    private final List<RegionScanner> scanners = new ArrayList<>();
397
398    public void addScanner(RegionScanner scanner) {
399      this.scanners.add(scanner);
400    }
401
402    @Override
403    public void run() {
404      for (RegionScanner scanner : scanners) {
405        try {
406          scanner.close();
407        } catch (IOException e) {
408          LOG.error("Exception while closing the scanner " + scanner, e);
409        }
410      }
411    }
412  }
413
414  /**
415   * Holder class which holds the RegionScanner, nextCallSeq and RpcCallbacks together.
416   */
417  static final class RegionScannerHolder {
418    private final AtomicLong nextCallSeq = new AtomicLong(0);
419    private final RegionScanner s;
420    private final HRegion r;
421    private final RpcCallback closeCallBack;
422    private final RpcCallback shippedCallback;
423    private byte[] rowOfLastPartialResult;
424    private boolean needCursor;
425    private boolean fullRegionScan;
426    private final String clientIPAndPort;
427    private final String userName;
428
429    RegionScannerHolder(RegionScanner s, HRegion r, RpcCallback closeCallBack,
430      RpcCallback shippedCallback, boolean needCursor, boolean fullRegionScan,
431      String clientIPAndPort, String userName) {
432      this.s = s;
433      this.r = r;
434      this.closeCallBack = closeCallBack;
435      this.shippedCallback = shippedCallback;
436      this.needCursor = needCursor;
437      this.fullRegionScan = fullRegionScan;
438      this.clientIPAndPort = clientIPAndPort;
439      this.userName = userName;
440    }
441
442    long getNextCallSeq() {
443      return nextCallSeq.get();
444    }
445
446    boolean incNextCallSeq(long currentSeq) {
447      // Use CAS to prevent multiple scan request running on the same scanner.
448      return nextCallSeq.compareAndSet(currentSeq, currentSeq + 1);
449    }
450
451    // Should be called only when we need to print lease expired messages otherwise
452    // cache the String once made.
453    @Override
454    public String toString() {
455      return "clientIPAndPort=" + this.clientIPAndPort + ", userName=" + this.userName
456        + ", regionInfo=" + this.r.getRegionInfo().getRegionNameAsString();
457    }
458  }
459
460  /**
461   * Instantiated as a scanner lease. If the lease times out, the scanner is closed
462   */
463  private class ScannerListener implements LeaseListener {
464    private final String scannerName;
465
466    ScannerListener(final String n) {
467      this.scannerName = n;
468    }
469
470    @Override
471    public void leaseExpired() {
472      RegionScannerHolder rsh = scanners.remove(this.scannerName);
473      if (rsh == null) {
474        LOG.warn("Scanner lease {} expired but no outstanding scanner", this.scannerName);
475        return;
476      }
477      LOG.info("Scanner lease {} expired {}", this.scannerName, rsh);
478      server.getMetrics().incrScannerLeaseExpired();
479      RegionScanner s = rsh.s;
480      HRegion region = null;
481      try {
482        region = server.getRegion(s.getRegionInfo().getRegionName());
483        if (region != null && region.getCoprocessorHost() != null) {
484          region.getCoprocessorHost().preScannerClose(s);
485        }
486      } catch (IOException e) {
487        LOG.error("Closing scanner {} {}", this.scannerName, rsh, e);
488      } finally {
489        try {
490          s.close();
491          if (region != null && region.getCoprocessorHost() != null) {
492            region.getCoprocessorHost().postScannerClose(s);
493          }
494        } catch (IOException e) {
495          LOG.error("Closing scanner {} {}", this.scannerName, rsh, e);
496        }
497      }
498    }
499  }
500
501  private static ResultOrException getResultOrException(final ClientProtos.Result r,
502    final int index) {
503    return getResultOrException(ResponseConverter.buildActionResult(r), index);
504  }
505
506  private static ResultOrException getResultOrException(final Exception e, final int index) {
507    return getResultOrException(ResponseConverter.buildActionResult(e), index);
508  }
509
510  private static ResultOrException getResultOrException(final ResultOrException.Builder builder,
511    final int index) {
512    return builder.setIndex(index).build();
513  }
514
515  /**
516   * Checks for the following pre-checks in order:
517   * <ol>
518   * <li>RegionServer is running</li>
519   * <li>If authorization is enabled, then RPC caller has ADMIN permissions</li>
520   * </ol>
521   * @param requestName name of rpc request. Used in reporting failures to provide context.
522   * @throws ServiceException If any of the above listed pre-check fails.
523   */
524  private void rpcPreCheck(String requestName) throws ServiceException {
525    try {
526      checkOpen();
527      requirePermission(requestName, Permission.Action.ADMIN);
528    } catch (IOException ioe) {
529      throw new ServiceException(ioe);
530    }
531  }
532
533  private boolean isClientCellBlockSupport(RpcCallContext context) {
534    return context != null && context.isClientCellBlockSupported();
535  }
536
537  private void addResult(final MutateResponse.Builder builder, final Result result,
538    final HBaseRpcController rpcc, boolean clientCellBlockSupported) {
539    if (result == null) return;
540    if (clientCellBlockSupported) {
541      builder.setResult(ProtobufUtil.toResultNoData(result));
542      rpcc.setCellScanner(result.cellScanner());
543    } else {
544      ClientProtos.Result pbr = ProtobufUtil.toResult(result);
545      builder.setResult(pbr);
546    }
547  }
548
549  private void addResults(ScanResponse.Builder builder, List<Result> results,
550    HBaseRpcController controller, boolean isDefaultRegion, boolean clientCellBlockSupported) {
551    builder.setStale(!isDefaultRegion);
552    if (results.isEmpty()) {
553      return;
554    }
555    if (clientCellBlockSupported) {
556      for (Result res : results) {
557        builder.addCellsPerResult(res.size());
558        builder.addPartialFlagPerResult(res.mayHaveMoreCellsInRow());
559      }
560      controller.setCellScanner(CellUtil.createCellScanner(results));
561    } else {
562      for (Result res : results) {
563        ClientProtos.Result pbr = ProtobufUtil.toResult(res);
564        builder.addResults(pbr);
565      }
566    }
567  }
568
569  private CheckAndMutateResult checkAndMutate(HRegion region, List<ClientProtos.Action> actions,
570    CellScanner cellScanner, Condition condition, long nonceGroup,
571    ActivePolicyEnforcement spaceQuotaEnforcement) throws IOException {
572    int countOfCompleteMutation = 0;
573    try {
574      if (!region.getRegionInfo().isMetaRegion()) {
575        server.getMemStoreFlusher().reclaimMemStoreMemory();
576      }
577      List<Mutation> mutations = new ArrayList<>();
578      long nonce = HConstants.NO_NONCE;
579      for (ClientProtos.Action action : actions) {
580        if (action.hasGet()) {
581          throw new DoNotRetryIOException(
582            "Atomic put and/or delete only, not a Get=" + action.getGet());
583        }
584        MutationProto mutation = action.getMutation();
585        MutationType type = mutation.getMutateType();
586        switch (type) {
587          case PUT:
588            Put put = ProtobufUtil.toPut(mutation, cellScanner);
589            ++countOfCompleteMutation;
590            checkCellSizeLimit(region, put);
591            spaceQuotaEnforcement.getPolicyEnforcement(region).check(put);
592            mutations.add(put);
593            break;
594          case DELETE:
595            Delete del = ProtobufUtil.toDelete(mutation, cellScanner);
596            ++countOfCompleteMutation;
597            spaceQuotaEnforcement.getPolicyEnforcement(region).check(del);
598            mutations.add(del);
599            break;
600          case INCREMENT:
601            Increment increment = ProtobufUtil.toIncrement(mutation, cellScanner);
602            nonce = mutation.hasNonce() ? mutation.getNonce() : HConstants.NO_NONCE;
603            ++countOfCompleteMutation;
604            checkCellSizeLimit(region, increment);
605            spaceQuotaEnforcement.getPolicyEnforcement(region).check(increment);
606            mutations.add(increment);
607            break;
608          case APPEND:
609            Append append = ProtobufUtil.toAppend(mutation, cellScanner);
610            nonce = mutation.hasNonce() ? mutation.getNonce() : HConstants.NO_NONCE;
611            ++countOfCompleteMutation;
612            checkCellSizeLimit(region, append);
613            spaceQuotaEnforcement.getPolicyEnforcement(region).check(append);
614            mutations.add(append);
615            break;
616          default:
617            throw new DoNotRetryIOException("invalid mutation type : " + type);
618        }
619      }
620
621      if (mutations.size() == 0) {
622        return new CheckAndMutateResult(true, null);
623      } else {
624        CheckAndMutate checkAndMutate = ProtobufUtil.toCheckAndMutate(condition, mutations);
625        CheckAndMutateResult result = null;
626        if (region.getCoprocessorHost() != null) {
627          result = region.getCoprocessorHost().preCheckAndMutate(checkAndMutate);
628        }
629        if (result == null) {
630          result = region.checkAndMutate(checkAndMutate, nonceGroup, nonce);
631          if (region.getCoprocessorHost() != null) {
632            result = region.getCoprocessorHost().postCheckAndMutate(checkAndMutate, result);
633          }
634        }
635        return result;
636      }
637    } finally {
638      // Currently, the checkAndMutate isn't supported by batch so it won't mess up the cell scanner
639      // even if the malformed cells are not skipped.
640      for (int i = countOfCompleteMutation; i < actions.size(); ++i) {
641        skipCellsForMutation(actions.get(i), cellScanner);
642      }
643    }
644  }
645
646  /**
647   * Execute an append mutation.
648   * @return result to return to client if default operation should be bypassed as indicated by
649   *         RegionObserver, null otherwise
650   */
651  private Result append(final HRegion region, final OperationQuota quota,
652    final MutationProto mutation, final CellScanner cellScanner, long nonceGroup,
653    ActivePolicyEnforcement spaceQuota) throws IOException {
654    long before = EnvironmentEdgeManager.currentTime();
655    Append append = ProtobufUtil.toAppend(mutation, cellScanner);
656    checkCellSizeLimit(region, append);
657    spaceQuota.getPolicyEnforcement(region).check(append);
658    quota.addMutation(append);
659    long nonce = mutation.hasNonce() ? mutation.getNonce() : HConstants.NO_NONCE;
660    Result r = region.append(append, nonceGroup, nonce);
661    if (server.getMetrics() != null) {
662      server.getMetrics().updateAppend(region.getTableDescriptor().getTableName(),
663        EnvironmentEdgeManager.currentTime() - before);
664    }
665    return r == null ? Result.EMPTY_RESULT : r;
666  }
667
668  /**
669   * Execute an increment mutation.
670   */
671  private Result increment(final HRegion region, final OperationQuota quota,
672    final MutationProto mutation, final CellScanner cells, long nonceGroup,
673    ActivePolicyEnforcement spaceQuota) throws IOException {
674    long before = EnvironmentEdgeManager.currentTime();
675    Increment increment = ProtobufUtil.toIncrement(mutation, cells);
676    checkCellSizeLimit(region, increment);
677    spaceQuota.getPolicyEnforcement(region).check(increment);
678    quota.addMutation(increment);
679    long nonce = mutation.hasNonce() ? mutation.getNonce() : HConstants.NO_NONCE;
680    Result r = region.increment(increment, nonceGroup, nonce);
681    final MetricsRegionServer metricsRegionServer = server.getMetrics();
682    if (metricsRegionServer != null) {
683      metricsRegionServer.updateIncrement(region.getTableDescriptor().getTableName(),
684        EnvironmentEdgeManager.currentTime() - before);
685    }
686    return r == null ? Result.EMPTY_RESULT : r;
687  }
688
689  /**
690   * Run through the regionMutation <code>rm</code> and per Mutation, do the work, and then when
691   * done, add an instance of a {@link ResultOrException} that corresponds to each Mutation.
692   * @param cellsToReturn Could be null. May be allocated in this method. This is what this method
693   *                      returns as a 'result'.
694   * @param closeCallBack the callback to be used with multigets
695   * @param context       the current RpcCallContext
696   * @return Return the <code>cellScanner</code> passed
697   */
698  private List<CellScannable> doNonAtomicRegionMutation(final HRegion region,
699    final OperationQuota quota, final RegionAction actions, final CellScanner cellScanner,
700    final RegionActionResult.Builder builder, List<CellScannable> cellsToReturn, long nonceGroup,
701    final RegionScannersCloseCallBack closeCallBack, RpcCallContext context,
702    ActivePolicyEnforcement spaceQuotaEnforcement) {
703    // Gather up CONTIGUOUS Puts and Deletes in this mutations List. Idea is that rather than do
704    // one at a time, we instead pass them in batch. Be aware that the corresponding
705    // ResultOrException instance that matches each Put or Delete is then added down in the
706    // doNonAtomicBatchOp call. We should be staying aligned though the Put and Delete are
707    // deferred/batched
708    List<ClientProtos.Action> mutations = null;
709    long maxQuotaResultSize = Math.min(maxScannerResultSize, quota.getReadAvailable());
710    IOException sizeIOE = null;
711    ClientProtos.ResultOrException.Builder resultOrExceptionBuilder =
712      ResultOrException.newBuilder();
713    boolean hasResultOrException = false;
714    for (ClientProtos.Action action : actions.getActionList()) {
715      hasResultOrException = false;
716      resultOrExceptionBuilder.clear();
717      try {
718        Result r = null;
719
720        if (
721          context != null && context.isRetryImmediatelySupported()
722            && (context.getResponseCellSize() > maxQuotaResultSize
723              || context.getResponseBlockSize() + context.getResponseExceptionSize()
724                  > maxQuotaResultSize)
725        ) {
726
727          // We're storing the exception since the exception and reason string won't
728          // change after the response size limit is reached.
729          if (sizeIOE == null) {
730            // We don't need the stack un-winding do don't throw the exception.
731            // Throwing will kill the JVM's JIT.
732            //
733            // Instead just create the exception and then store it.
734            sizeIOE = new MultiActionResultTooLarge("Max size exceeded" + " CellSize: "
735              + context.getResponseCellSize() + " BlockSize: " + context.getResponseBlockSize());
736
737            // Only report the exception once since there's only one request that
738            // caused the exception. Otherwise this number will dominate the exceptions count.
739            rpcServer.getMetrics().exception(sizeIOE);
740          }
741
742          // Now that there's an exception is known to be created
743          // use it for the response.
744          //
745          // This will create a copy in the builder.
746          NameBytesPair pair = ResponseConverter.buildException(sizeIOE);
747          resultOrExceptionBuilder.setException(pair);
748          context.incrementResponseExceptionSize(pair.getSerializedSize());
749          resultOrExceptionBuilder.setIndex(action.getIndex());
750          builder.addResultOrException(resultOrExceptionBuilder.build());
751          skipCellsForMutation(action, cellScanner);
752          continue;
753        }
754        if (action.hasGet()) {
755          long before = EnvironmentEdgeManager.currentTime();
756          ClientProtos.Get pbGet = action.getGet();
757          // An asynchbase client, https://github.com/OpenTSDB/asynchbase, starts by trying to do
758          // a get closest before. Throwing the UnknownProtocolException signals it that it needs
759          // to switch and do hbase2 protocol (HBase servers do not tell clients what versions
760          // they are; its a problem for non-native clients like asynchbase. HBASE-20225.
761          if (pbGet.hasClosestRowBefore() && pbGet.getClosestRowBefore()) {
762            throw new UnknownProtocolException("Is this a pre-hbase-1.0.0 or asynchbase client? "
763              + "Client is invoking getClosestRowBefore removed in hbase-2.0.0 replaced by "
764              + "reverse Scan.");
765          }
766          try {
767            Get get = ProtobufUtil.toGet(pbGet);
768            if (context != null) {
769              r = get(get, (region), closeCallBack, context);
770            } else {
771              r = region.get(get);
772            }
773          } finally {
774            final MetricsRegionServer metricsRegionServer = server.getMetrics();
775            if (metricsRegionServer != null) {
776              metricsRegionServer.updateGet(region.getTableDescriptor().getTableName(),
777                EnvironmentEdgeManager.currentTime() - before);
778            }
779          }
780        } else if (action.hasServiceCall()) {
781          hasResultOrException = true;
782          Message result = execServiceOnRegion(region, action.getServiceCall());
783          ClientProtos.CoprocessorServiceResult.Builder serviceResultBuilder =
784            ClientProtos.CoprocessorServiceResult.newBuilder();
785          resultOrExceptionBuilder.setServiceResult(serviceResultBuilder
786            .setValue(serviceResultBuilder.getValueBuilder().setName(result.getClass().getName())
787              // TODO: Copy!!!
788              .setValue(UnsafeByteOperations.unsafeWrap(result.toByteArray()))));
789        } else if (action.hasMutation()) {
790          MutationType type = action.getMutation().getMutateType();
791          if (
792            type != MutationType.PUT && type != MutationType.DELETE && mutations != null
793              && !mutations.isEmpty()
794          ) {
795            // Flush out any Puts or Deletes already collected.
796            doNonAtomicBatchOp(builder, region, quota, mutations, cellScanner,
797              spaceQuotaEnforcement);
798            mutations.clear();
799          }
800          switch (type) {
801            case APPEND:
802              r = append(region, quota, action.getMutation(), cellScanner, nonceGroup,
803                spaceQuotaEnforcement);
804              break;
805            case INCREMENT:
806              r = increment(region, quota, action.getMutation(), cellScanner, nonceGroup,
807                spaceQuotaEnforcement);
808              break;
809            case PUT:
810            case DELETE:
811              // Collect the individual mutations and apply in a batch
812              if (mutations == null) {
813                mutations = new ArrayList<>(actions.getActionCount());
814              }
815              mutations.add(action);
816              break;
817            default:
818              throw new DoNotRetryIOException("Unsupported mutate type: " + type.name());
819          }
820        } else {
821          throw new HBaseIOException("Unexpected Action type");
822        }
823        if (r != null) {
824          ClientProtos.Result pbResult = null;
825          if (isClientCellBlockSupport(context)) {
826            pbResult = ProtobufUtil.toResultNoData(r);
827            // Hard to guess the size here. Just make a rough guess.
828            if (cellsToReturn == null) {
829              cellsToReturn = new ArrayList<>();
830            }
831            cellsToReturn.add(r);
832          } else {
833            pbResult = ProtobufUtil.toResult(r);
834          }
835          addSize(context, r);
836          hasResultOrException = true;
837          resultOrExceptionBuilder.setResult(pbResult);
838        }
839        // Could get to here and there was no result and no exception. Presumes we added
840        // a Put or Delete to the collecting Mutations List for adding later. In this
841        // case the corresponding ResultOrException instance for the Put or Delete will be added
842        // down in the doNonAtomicBatchOp method call rather than up here.
843      } catch (IOException ie) {
844        rpcServer.getMetrics().exception(ie);
845        hasResultOrException = true;
846        NameBytesPair pair = ResponseConverter.buildException(ie);
847        resultOrExceptionBuilder.setException(pair);
848        context.incrementResponseExceptionSize(pair.getSerializedSize());
849      }
850      if (hasResultOrException) {
851        // Propagate index.
852        resultOrExceptionBuilder.setIndex(action.getIndex());
853        builder.addResultOrException(resultOrExceptionBuilder.build());
854      }
855    }
856    // Finish up any outstanding mutations
857    if (!CollectionUtils.isEmpty(mutations)) {
858      doNonAtomicBatchOp(builder, region, quota, mutations, cellScanner, spaceQuotaEnforcement);
859    }
860    return cellsToReturn;
861  }
862
863  private void checkCellSizeLimit(final HRegion r, final Mutation m) throws IOException {
864    if (r.maxCellSize > 0) {
865      CellScanner cells = m.cellScanner();
866      while (cells.advance()) {
867        int size = PrivateCellUtil.estimatedSerializedSizeOf(cells.current());
868        if (size > r.maxCellSize) {
869          String msg = "Cell[" + cells.current() + "] with size " + size + " exceeds limit of "
870            + r.maxCellSize + " bytes";
871          LOG.debug(msg);
872          throw new DoNotRetryIOException(msg);
873        }
874      }
875    }
876  }
877
878  private void doAtomicBatchOp(final RegionActionResult.Builder builder, final HRegion region,
879    final OperationQuota quota, final List<ClientProtos.Action> mutations, final CellScanner cells,
880    long nonceGroup, ActivePolicyEnforcement spaceQuotaEnforcement) throws IOException {
881    // Just throw the exception. The exception will be caught and then added to region-level
882    // exception for RegionAction. Leaving the null to action result is ok since the null
883    // result is viewed as failure by hbase client. And the region-lever exception will be used
884    // to replaced the null result. see AsyncRequestFutureImpl#receiveMultiAction and
885    // AsyncBatchRpcRetryingCaller#onComplete for more details.
886    doBatchOp(builder, region, quota, mutations, cells, nonceGroup, spaceQuotaEnforcement, true);
887  }
888
889  private void doNonAtomicBatchOp(final RegionActionResult.Builder builder, final HRegion region,
890    final OperationQuota quota, final List<ClientProtos.Action> mutations, final CellScanner cells,
891    ActivePolicyEnforcement spaceQuotaEnforcement) {
892    try {
893      doBatchOp(builder, region, quota, mutations, cells, HConstants.NO_NONCE,
894        spaceQuotaEnforcement, false);
895    } catch (IOException e) {
896      // Set the exception for each action. The mutations in same RegionAction are group to
897      // different batch and then be processed individually. Hence, we don't set the region-level
898      // exception here for whole RegionAction.
899      for (Action mutation : mutations) {
900        builder.addResultOrException(getResultOrException(e, mutation.getIndex()));
901      }
902    }
903  }
904
905  /**
906   * Execute a list of mutations.
907   */
908  private void doBatchOp(final RegionActionResult.Builder builder, final HRegion region,
909    final OperationQuota quota, final List<ClientProtos.Action> mutations, final CellScanner cells,
910    long nonceGroup, ActivePolicyEnforcement spaceQuotaEnforcement, boolean atomic)
911    throws IOException {
912    Mutation[] mArray = new Mutation[mutations.size()];
913    long before = EnvironmentEdgeManager.currentTime();
914    boolean batchContainsPuts = false, batchContainsDelete = false;
915    try {
916      /**
917       * HBASE-17924 mutationActionMap is a map to map the relation between mutations and actions
918       * since mutation array may have been reoredered.In order to return the right result or
919       * exception to the corresponding actions, We need to know which action is the mutation belong
920       * to. We can't sort ClientProtos.Action array, since they are bonded to cellscanners.
921       */
922      Map<Mutation, ClientProtos.Action> mutationActionMap = new HashMap<>();
923      int i = 0;
924      long nonce = HConstants.NO_NONCE;
925      for (ClientProtos.Action action : mutations) {
926        if (action.hasGet()) {
927          throw new DoNotRetryIOException(
928            "Atomic put and/or delete only, not a Get=" + action.getGet());
929        }
930        MutationProto m = action.getMutation();
931        Mutation mutation;
932        switch (m.getMutateType()) {
933          case PUT:
934            mutation = ProtobufUtil.toPut(m, cells);
935            batchContainsPuts = true;
936            break;
937
938          case DELETE:
939            mutation = ProtobufUtil.toDelete(m, cells);
940            batchContainsDelete = true;
941            break;
942
943          case INCREMENT:
944            mutation = ProtobufUtil.toIncrement(m, cells);
945            nonce = m.hasNonce() ? m.getNonce() : HConstants.NO_NONCE;
946            break;
947
948          case APPEND:
949            mutation = ProtobufUtil.toAppend(m, cells);
950            nonce = m.hasNonce() ? m.getNonce() : HConstants.NO_NONCE;
951            break;
952
953          default:
954            throw new DoNotRetryIOException("Invalid mutation type : " + m.getMutateType());
955        }
956        mutationActionMap.put(mutation, action);
957        mArray[i++] = mutation;
958        checkCellSizeLimit(region, mutation);
959        // Check if a space quota disallows this mutation
960        spaceQuotaEnforcement.getPolicyEnforcement(region).check(mutation);
961        quota.addMutation(mutation);
962      }
963
964      if (!region.getRegionInfo().isMetaRegion()) {
965        server.getMemStoreFlusher().reclaimMemStoreMemory();
966      }
967
968      // HBASE-17924
969      // Sort to improve lock efficiency for non-atomic batch of operations. If atomic
970      // order is preserved as its expected from the client
971      if (!atomic) {
972        Arrays.sort(mArray, (v1, v2) -> Row.COMPARATOR.compare(v1, v2));
973      }
974
975      OperationStatus[] codes = region.batchMutate(mArray, atomic, nonceGroup, nonce);
976
977      // When atomic is true, it indicates that the mutateRow API or the batch API with
978      // RowMutations is called. In this case, we need to merge the results of the
979      // Increment/Append operations if the mutations include those operations, and set the merged
980      // result to the first element of the ResultOrException list
981      if (atomic) {
982        List<ResultOrException> resultOrExceptions = new ArrayList<>();
983        List<Result> results = new ArrayList<>();
984        for (i = 0; i < codes.length; i++) {
985          if (codes[i].getResult() != null) {
986            results.add(codes[i].getResult());
987          }
988          if (i != 0) {
989            resultOrExceptions
990              .add(getResultOrException(ClientProtos.Result.getDefaultInstance(), i));
991          }
992        }
993
994        if (results.isEmpty()) {
995          builder.addResultOrException(
996            getResultOrException(ClientProtos.Result.getDefaultInstance(), 0));
997        } else {
998          // Merge the results of the Increment/Append operations
999          List<Cell> cellList = new ArrayList<>();
1000          for (Result result : results) {
1001            if (result.rawCells() != null) {
1002              cellList.addAll(Arrays.asList(result.rawCells()));
1003            }
1004          }
1005          Result result = Result.create(cellList);
1006
1007          // Set the merged result of the Increment/Append operations to the first element of the
1008          // ResultOrException list
1009          builder.addResultOrException(getResultOrException(ProtobufUtil.toResult(result), 0));
1010        }
1011
1012        builder.addAllResultOrException(resultOrExceptions);
1013        return;
1014      }
1015
1016      for (i = 0; i < codes.length; i++) {
1017        Mutation currentMutation = mArray[i];
1018        ClientProtos.Action currentAction = mutationActionMap.get(currentMutation);
1019        int index = currentAction.hasIndex() ? currentAction.getIndex() : i;
1020        Exception e;
1021        switch (codes[i].getOperationStatusCode()) {
1022          case BAD_FAMILY:
1023            e = new NoSuchColumnFamilyException(codes[i].getExceptionMsg());
1024            builder.addResultOrException(getResultOrException(e, index));
1025            break;
1026
1027          case SANITY_CHECK_FAILURE:
1028            e = new FailedSanityCheckException(codes[i].getExceptionMsg());
1029            builder.addResultOrException(getResultOrException(e, index));
1030            break;
1031
1032          default:
1033            e = new DoNotRetryIOException(codes[i].getExceptionMsg());
1034            builder.addResultOrException(getResultOrException(e, index));
1035            break;
1036
1037          case SUCCESS:
1038            builder.addResultOrException(
1039              getResultOrException(ClientProtos.Result.getDefaultInstance(), index));
1040            break;
1041
1042          case STORE_TOO_BUSY:
1043            e = new RegionTooBusyException(codes[i].getExceptionMsg());
1044            builder.addResultOrException(getResultOrException(e, index));
1045            break;
1046        }
1047      }
1048    } finally {
1049      int processedMutationIndex = 0;
1050      for (Action mutation : mutations) {
1051        // The non-null mArray[i] means the cell scanner has been read.
1052        if (mArray[processedMutationIndex++] == null) {
1053          skipCellsForMutation(mutation, cells);
1054        }
1055      }
1056      updateMutationMetrics(region, before, batchContainsPuts, batchContainsDelete);
1057    }
1058  }
1059
1060  private void updateMutationMetrics(HRegion region, long starttime, boolean batchContainsPuts,
1061    boolean batchContainsDelete) {
1062    final MetricsRegionServer metricsRegionServer = server.getMetrics();
1063    if (metricsRegionServer != null) {
1064      long after = EnvironmentEdgeManager.currentTime();
1065      if (batchContainsPuts) {
1066        metricsRegionServer.updatePutBatch(region.getTableDescriptor().getTableName(),
1067          after - starttime);
1068      }
1069      if (batchContainsDelete) {
1070        metricsRegionServer.updateDeleteBatch(region.getTableDescriptor().getTableName(),
1071          after - starttime);
1072      }
1073    }
1074  }
1075
1076  /**
1077   * Execute a list of Put/Delete mutations. The function returns OperationStatus instead of
1078   * constructing MultiResponse to save a possible loop if caller doesn't need MultiResponse.
1079   * @return an array of OperationStatus which internally contains the OperationStatusCode and the
1080   *         exceptionMessage if any
1081   * @deprecated Since 3.0.0, will be removed in 4.0.0. We do not use this method for replaying
1082   *             edits for secondary replicas any more, see
1083   *             {@link #replicateToReplica(RpcController, ReplicateWALEntryRequest)}.
1084   */
1085  @Deprecated
1086  private OperationStatus[] doReplayBatchOp(final HRegion region,
1087    final List<MutationReplay> mutations, long replaySeqId) throws IOException {
1088    long before = EnvironmentEdgeManager.currentTime();
1089    boolean batchContainsPuts = false, batchContainsDelete = false;
1090    try {
1091      for (Iterator<MutationReplay> it = mutations.iterator(); it.hasNext();) {
1092        MutationReplay m = it.next();
1093
1094        if (m.getType() == MutationType.PUT) {
1095          batchContainsPuts = true;
1096        } else {
1097          batchContainsDelete = true;
1098        }
1099
1100        NavigableMap<byte[], List<Cell>> map = m.mutation.getFamilyCellMap();
1101        List<Cell> metaCells = map.get(WALEdit.METAFAMILY);
1102        if (metaCells != null && !metaCells.isEmpty()) {
1103          for (Cell metaCell : metaCells) {
1104            CompactionDescriptor compactionDesc = WALEdit.getCompaction(metaCell);
1105            boolean isDefaultReplica = RegionReplicaUtil.isDefaultReplica(region.getRegionInfo());
1106            HRegion hRegion = region;
1107            if (compactionDesc != null) {
1108              // replay the compaction. Remove the files from stores only if we are the primary
1109              // region replica (thus own the files)
1110              hRegion.replayWALCompactionMarker(compactionDesc, !isDefaultReplica, isDefaultReplica,
1111                replaySeqId);
1112              continue;
1113            }
1114            FlushDescriptor flushDesc = WALEdit.getFlushDescriptor(metaCell);
1115            if (flushDesc != null && !isDefaultReplica) {
1116              hRegion.replayWALFlushMarker(flushDesc, replaySeqId);
1117              continue;
1118            }
1119            RegionEventDescriptor regionEvent = WALEdit.getRegionEventDescriptor(metaCell);
1120            if (regionEvent != null && !isDefaultReplica) {
1121              hRegion.replayWALRegionEventMarker(regionEvent);
1122              continue;
1123            }
1124            BulkLoadDescriptor bulkLoadEvent = WALEdit.getBulkLoadDescriptor(metaCell);
1125            if (bulkLoadEvent != null) {
1126              hRegion.replayWALBulkLoadEventMarker(bulkLoadEvent);
1127              continue;
1128            }
1129          }
1130          it.remove();
1131        }
1132      }
1133      requestCount.increment();
1134      if (!region.getRegionInfo().isMetaRegion()) {
1135        server.getMemStoreFlusher().reclaimMemStoreMemory();
1136      }
1137      return region.batchReplay(mutations.toArray(new MutationReplay[mutations.size()]),
1138        replaySeqId);
1139    } finally {
1140      updateMutationMetrics(region, before, batchContainsPuts, batchContainsDelete);
1141    }
1142  }
1143
1144  private void closeAllScanners() {
1145    // Close any outstanding scanners. Means they'll get an UnknownScanner
1146    // exception next time they come in.
1147    for (Map.Entry<String, RegionScannerHolder> e : scanners.entrySet()) {
1148      try {
1149        e.getValue().s.close();
1150      } catch (IOException ioe) {
1151        LOG.warn("Closing scanner " + e.getKey(), ioe);
1152      }
1153    }
1154  }
1155
1156  // Directly invoked only for testing
1157  public RSRpcServices(final HRegionServer rs) throws IOException {
1158    super(rs, rs.getProcessName());
1159    final Configuration conf = rs.getConfiguration();
1160    rowSizeWarnThreshold =
1161      conf.getInt(HConstants.BATCH_ROWS_THRESHOLD_NAME, HConstants.BATCH_ROWS_THRESHOLD_DEFAULT);
1162    rejectRowsWithSizeOverThreshold =
1163      conf.getBoolean(REJECT_BATCH_ROWS_OVER_THRESHOLD, DEFAULT_REJECT_BATCH_ROWS_OVER_THRESHOLD);
1164    scannerLeaseTimeoutPeriod = conf.getInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD,
1165      HConstants.DEFAULT_HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD);
1166    maxScannerResultSize = conf.getLong(HConstants.HBASE_SERVER_SCANNER_MAX_RESULT_SIZE_KEY,
1167      HConstants.DEFAULT_HBASE_SERVER_SCANNER_MAX_RESULT_SIZE);
1168    rpcTimeout =
1169      conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY, HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
1170    minimumScanTimeLimitDelta = conf.getLong(REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA,
1171      DEFAULT_REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA);
1172    rpcServer.setNamedQueueRecorder(rs.getNamedQueueRecorder());
1173    closedScanners = CacheBuilder.newBuilder()
1174      .expireAfterAccess(scannerLeaseTimeoutPeriod, TimeUnit.MILLISECONDS).build();
1175  }
1176
1177  @Override
1178  protected boolean defaultReservoirEnabled() {
1179    return true;
1180  }
1181
1182  @Override
1183  protected ServerType getDNSServerType() {
1184    return DNS.ServerType.REGIONSERVER;
1185  }
1186
1187  @Override
1188  protected String getHostname(Configuration conf, String defaultHostname) {
1189    return conf.get("hbase.regionserver.ipc.address", defaultHostname);
1190  }
1191
1192  @Override
1193  protected String getPortConfigName() {
1194    return HConstants.REGIONSERVER_PORT;
1195  }
1196
1197  @Override
1198  protected int getDefaultPort() {
1199    return HConstants.DEFAULT_REGIONSERVER_PORT;
1200  }
1201
1202  @Override
1203  protected Class<?> getRpcSchedulerFactoryClass(Configuration conf) {
1204    return conf.getClass(REGION_SERVER_RPC_SCHEDULER_FACTORY_CLASS,
1205      SimpleRpcSchedulerFactory.class);
1206  }
1207
1208  protected RpcServerInterface createRpcServer(final Server server,
1209    final RpcSchedulerFactory rpcSchedulerFactory, final InetSocketAddress bindAddress,
1210    final String name) throws IOException {
1211    final Configuration conf = server.getConfiguration();
1212    boolean reservoirEnabled = conf.getBoolean(ByteBuffAllocator.ALLOCATOR_POOL_ENABLED_KEY, true);
1213    try {
1214      return RpcServerFactory.createRpcServer(server, name, getServices(), bindAddress, // use final
1215                                                                                        // bindAddress
1216                                                                                        // for this
1217                                                                                        // server.
1218        conf, rpcSchedulerFactory.create(conf, this, server), reservoirEnabled);
1219    } catch (BindException be) {
1220      throw new IOException(be.getMessage() + ". To switch ports use the '"
1221        + HConstants.REGIONSERVER_PORT + "' configuration property.",
1222        be.getCause() != null ? be.getCause() : be);
1223    }
1224  }
1225
1226  protected Class<?> getRpcSchedulerFactoryClass() {
1227    final Configuration conf = server.getConfiguration();
1228    return conf.getClass(REGION_SERVER_RPC_SCHEDULER_FACTORY_CLASS,
1229      SimpleRpcSchedulerFactory.class);
1230  }
1231
1232  protected PriorityFunction createPriority() {
1233    return new RSAnnotationReadingPriorityFunction(this);
1234  }
1235
1236  public int getScannersCount() {
1237    return scanners.size();
1238  }
1239
1240  /** Returns The outstanding RegionScanner for <code>scannerId</code> or null if none found. */
1241  RegionScanner getScanner(long scannerId) {
1242    RegionScannerHolder rsh = getRegionScannerHolder(scannerId);
1243    return rsh == null ? null : rsh.s;
1244  }
1245
1246  /** Returns The associated RegionScannerHolder for <code>scannerId</code> or null. */
1247  private RegionScannerHolder getRegionScannerHolder(long scannerId) {
1248    return scanners.get(toScannerName(scannerId));
1249  }
1250
1251  public String getScanDetailsWithId(long scannerId) {
1252    RegionScanner scanner = getScanner(scannerId);
1253    if (scanner == null) {
1254      return null;
1255    }
1256    StringBuilder builder = new StringBuilder();
1257    builder.append("table: ").append(scanner.getRegionInfo().getTable().getNameAsString());
1258    builder.append(" region: ").append(scanner.getRegionInfo().getRegionNameAsString());
1259    builder.append(" operation_id: ").append(scanner.getOperationId());
1260    return builder.toString();
1261  }
1262
1263  public String getScanDetailsWithRequest(ScanRequest request) {
1264    try {
1265      if (!request.hasRegion()) {
1266        return null;
1267      }
1268      Region region = getRegion(request.getRegion());
1269      StringBuilder builder = new StringBuilder();
1270      builder.append("table: ").append(region.getRegionInfo().getTable().getNameAsString());
1271      builder.append(" region: ").append(region.getRegionInfo().getRegionNameAsString());
1272      for (NameBytesPair pair : request.getScan().getAttributeList()) {
1273        if (OperationWithAttributes.ID_ATRIBUTE.equals(pair.getName())) {
1274          builder.append(" operation_id: ").append(Bytes.toString(pair.getValue().toByteArray()));
1275          break;
1276        }
1277      }
1278      return builder.toString();
1279    } catch (IOException ignored) {
1280      return null;
1281    }
1282  }
1283
1284  /**
1285   * Get the vtime associated with the scanner. Currently the vtime is the number of "next" calls.
1286   */
1287  long getScannerVirtualTime(long scannerId) {
1288    RegionScannerHolder rsh = getRegionScannerHolder(scannerId);
1289    return rsh == null ? 0L : rsh.getNextCallSeq();
1290  }
1291
1292  /**
1293   * Method to account for the size of retained cells.
1294   * @param context rpc call context
1295   * @param r       result to add size.
1296   * @return an object that represents the last referenced block from this response.
1297   */
1298  void addSize(RpcCallContext context, Result r) {
1299    if (context != null && r != null && !r.isEmpty()) {
1300      for (Cell c : r.rawCells()) {
1301        context.incrementResponseCellSize(PrivateCellUtil.estimatedSerializedSizeOf(c));
1302      }
1303    }
1304  }
1305
1306  /** Returns Remote client's ip and port else null if can't be determined. */
1307  @RestrictedApi(explanation = "Should only be called in TestRSRpcServices and RSRpcServices",
1308      link = "", allowedOnPath = ".*(TestRSRpcServices|RSRpcServices).java")
1309  static String getRemoteClientIpAndPort() {
1310    RpcCall rpcCall = RpcServer.getCurrentCall().orElse(null);
1311    if (rpcCall == null) {
1312      return HConstants.EMPTY_STRING;
1313    }
1314    InetAddress address = rpcCall.getRemoteAddress();
1315    if (address == null) {
1316      return HConstants.EMPTY_STRING;
1317    }
1318    // Be careful here with InetAddress. Do InetAddress#getHostAddress. It will not do a name
1319    // resolution. Just use the IP. It is generally a smaller amount of info to keep around while
1320    // scanning than a hostname anyways.
1321    return Address.fromParts(address.getHostAddress(), rpcCall.getRemotePort()).toString();
1322  }
1323
1324  /** Returns Remote client's username. */
1325  @RestrictedApi(explanation = "Should only be called in TestRSRpcServices and RSRpcServices",
1326      link = "", allowedOnPath = ".*(TestRSRpcServices|RSRpcServices).java")
1327  static String getUserName() {
1328    RpcCall rpcCall = RpcServer.getCurrentCall().orElse(null);
1329    if (rpcCall == null) {
1330      return HConstants.EMPTY_STRING;
1331    }
1332    return rpcCall.getRequestUserName().orElse(HConstants.EMPTY_STRING);
1333  }
1334
1335  private RegionScannerHolder addScanner(String scannerName, RegionScanner s, Shipper shipper,
1336    HRegion r, boolean needCursor, boolean fullRegionScan) throws LeaseStillHeldException {
1337    Lease lease = server.getLeaseManager().createLease(scannerName, this.scannerLeaseTimeoutPeriod,
1338      new ScannerListener(scannerName));
1339    RpcCallback shippedCallback = new RegionScannerShippedCallBack(scannerName, shipper, lease);
1340    RpcCallback closeCallback =
1341      s instanceof RpcCallback ? (RpcCallback) s : new RegionScannerCloseCallBack(s);
1342    RegionScannerHolder rsh = new RegionScannerHolder(s, r, closeCallback, shippedCallback,
1343      needCursor, fullRegionScan, getRemoteClientIpAndPort(), getUserName());
1344    RegionScannerHolder existing = scanners.putIfAbsent(scannerName, rsh);
1345    assert existing == null : "scannerId must be unique within regionserver's whole lifecycle! "
1346      + scannerName + ", " + existing;
1347    return rsh;
1348  }
1349
1350  private boolean isFullRegionScan(Scan scan, HRegion region) {
1351    // If the scan start row equals or less than the start key of the region
1352    // and stop row greater than equals end key (if stop row present)
1353    // or if the stop row is empty
1354    // account this as a full region scan
1355    if (
1356      Bytes.compareTo(scan.getStartRow(), region.getRegionInfo().getStartKey()) <= 0
1357        && (Bytes.compareTo(scan.getStopRow(), region.getRegionInfo().getEndKey()) >= 0
1358          && !Bytes.equals(region.getRegionInfo().getEndKey(), HConstants.EMPTY_END_ROW)
1359          || Bytes.equals(scan.getStopRow(), HConstants.EMPTY_END_ROW))
1360    ) {
1361      return true;
1362    }
1363    return false;
1364  }
1365
1366  /**
1367   * Find the HRegion based on a region specifier
1368   * @param regionSpecifier the region specifier
1369   * @return the corresponding region
1370   * @throws IOException if the specifier is not null, but failed to find the region
1371   */
1372  public HRegion getRegion(final RegionSpecifier regionSpecifier) throws IOException {
1373    return server.getRegion(regionSpecifier.getValue().toByteArray());
1374  }
1375
1376  /**
1377   * Find the List of HRegions based on a list of region specifiers
1378   * @param regionSpecifiers the list of region specifiers
1379   * @return the corresponding list of regions
1380   * @throws IOException if any of the specifiers is not null, but failed to find the region
1381   */
1382  private List<HRegion> getRegions(final List<RegionSpecifier> regionSpecifiers,
1383    final CacheEvictionStatsBuilder stats) {
1384    List<HRegion> regions = Lists.newArrayListWithCapacity(regionSpecifiers.size());
1385    for (RegionSpecifier regionSpecifier : regionSpecifiers) {
1386      try {
1387        regions.add(server.getRegion(regionSpecifier.getValue().toByteArray()));
1388      } catch (NotServingRegionException e) {
1389        stats.addException(regionSpecifier.getValue().toByteArray(), e);
1390      }
1391    }
1392    return regions;
1393  }
1394
1395  PriorityFunction getPriority() {
1396    return priority;
1397  }
1398
1399  private RegionServerRpcQuotaManager getRpcQuotaManager() {
1400    return server.getRegionServerRpcQuotaManager();
1401  }
1402
1403  private RegionServerSpaceQuotaManager getSpaceQuotaManager() {
1404    return server.getRegionServerSpaceQuotaManager();
1405  }
1406
1407  void start(ZKWatcher zkWatcher) {
1408    this.scannerIdGenerator = new ScannerIdGenerator(this.server.getServerName());
1409    internalStart(zkWatcher);
1410  }
1411
1412  void stop() {
1413    closeAllScanners();
1414    internalStop();
1415  }
1416
1417  /**
1418   * Called to verify that this server is up and running.
1419   */
1420  // TODO : Rename this and HMaster#checkInitialized to isRunning() (or a better name).
1421  protected void checkOpen() throws IOException {
1422    if (server.isAborted()) {
1423      throw new RegionServerAbortedException("Server " + server.getServerName() + " aborting");
1424    }
1425    if (server.isStopped()) {
1426      throw new RegionServerStoppedException("Server " + server.getServerName() + " stopping");
1427    }
1428    if (!server.isDataFileSystemOk()) {
1429      throw new RegionServerStoppedException("File system not available");
1430    }
1431    if (!server.isOnline()) {
1432      throw new ServerNotRunningYetException(
1433        "Server " + server.getServerName() + " is not running yet");
1434    }
1435  }
1436
1437  /**
1438   * By default, put up an Admin and a Client Service. Set booleans
1439   * <code>hbase.regionserver.admin.executorService</code> and
1440   * <code>hbase.regionserver.client.executorService</code> if you want to enable/disable services.
1441   * Default is that both are enabled.
1442   * @return immutable list of blocking services and the security info classes that this server
1443   *         supports
1444   */
1445  protected List<BlockingServiceAndInterface> getServices() {
1446    boolean admin = getConfiguration().getBoolean(REGIONSERVER_ADMIN_SERVICE_CONFIG, true);
1447    boolean client = getConfiguration().getBoolean(REGIONSERVER_CLIENT_SERVICE_CONFIG, true);
1448    boolean clientMeta =
1449      getConfiguration().getBoolean(REGIONSERVER_CLIENT_META_SERVICE_CONFIG, true);
1450    List<BlockingServiceAndInterface> bssi = new ArrayList<>();
1451    if (client) {
1452      bssi.add(new BlockingServiceAndInterface(ClientService.newReflectiveBlockingService(this),
1453        ClientService.BlockingInterface.class));
1454    }
1455    if (admin) {
1456      bssi.add(new BlockingServiceAndInterface(AdminService.newReflectiveBlockingService(this),
1457        AdminService.BlockingInterface.class));
1458    }
1459    if (clientMeta) {
1460      bssi.add(new BlockingServiceAndInterface(ClientMetaService.newReflectiveBlockingService(this),
1461        ClientMetaService.BlockingInterface.class));
1462    }
1463    return new ImmutableList.Builder<BlockingServiceAndInterface>().addAll(bssi).build();
1464  }
1465
1466  /**
1467   * Close a region on the region server.
1468   * @param controller the RPC controller
1469   * @param request    the request
1470   */
1471  @Override
1472  @QosPriority(priority = HConstants.ADMIN_QOS)
1473  public CloseRegionResponse closeRegion(final RpcController controller,
1474    final CloseRegionRequest request) throws ServiceException {
1475    final ServerName sn = (request.hasDestinationServer()
1476      ? ProtobufUtil.toServerName(request.getDestinationServer())
1477      : null);
1478
1479    try {
1480      checkOpen();
1481      throwOnWrongStartCode(request);
1482      final String encodedRegionName = ProtobufUtil.getRegionEncodedName(request.getRegion());
1483
1484      requestCount.increment();
1485      if (sn == null) {
1486        LOG.info("Close " + encodedRegionName + " without moving");
1487      } else {
1488        LOG.info("Close " + encodedRegionName + ", moving to " + sn);
1489      }
1490      boolean closed = server.closeRegion(encodedRegionName, false, sn);
1491      CloseRegionResponse.Builder builder = CloseRegionResponse.newBuilder().setClosed(closed);
1492      return builder.build();
1493    } catch (IOException ie) {
1494      throw new ServiceException(ie);
1495    }
1496  }
1497
1498  /**
1499   * Compact a region on the region server.
1500   * @param controller the RPC controller
1501   * @param request    the request
1502   */
1503  @Override
1504  @QosPriority(priority = HConstants.ADMIN_QOS)
1505  public CompactRegionResponse compactRegion(final RpcController controller,
1506    final CompactRegionRequest request) throws ServiceException {
1507    try {
1508      checkOpen();
1509      requestCount.increment();
1510      HRegion region = getRegion(request.getRegion());
1511      // Quota support is enabled, the requesting user is not system/super user
1512      // and a quota policy is enforced that disables compactions.
1513      if (
1514        QuotaUtil.isQuotaEnabled(getConfiguration())
1515          && !Superusers.isSuperUser(RpcServer.getRequestUser().orElse(null))
1516          && this.server.getRegionServerSpaceQuotaManager()
1517            .areCompactionsDisabled(region.getTableDescriptor().getTableName())
1518      ) {
1519        throw new DoNotRetryIOException(
1520          "Compactions on this region are " + "disabled due to a space quota violation.");
1521      }
1522      region.startRegionOperation(Operation.COMPACT_REGION);
1523      LOG.info("Compacting " + region.getRegionInfo().getRegionNameAsString());
1524      boolean major = request.hasMajor() && request.getMajor();
1525      if (request.hasFamily()) {
1526        byte[] family = request.getFamily().toByteArray();
1527        String log = "User-triggered " + (major ? "major " : "") + "compaction for region "
1528          + region.getRegionInfo().getRegionNameAsString() + " and family "
1529          + Bytes.toString(family);
1530        LOG.trace(log);
1531        region.requestCompaction(family, log, Store.PRIORITY_USER, major,
1532          CompactionLifeCycleTracker.DUMMY);
1533      } else {
1534        String log = "User-triggered " + (major ? "major " : "") + "compaction for region "
1535          + region.getRegionInfo().getRegionNameAsString();
1536        LOG.trace(log);
1537        region.requestCompaction(log, Store.PRIORITY_USER, major, CompactionLifeCycleTracker.DUMMY);
1538      }
1539      return CompactRegionResponse.newBuilder().build();
1540    } catch (IOException ie) {
1541      throw new ServiceException(ie);
1542    }
1543  }
1544
1545  @Override
1546  public CompactionSwitchResponse compactionSwitch(RpcController controller,
1547    CompactionSwitchRequest request) throws ServiceException {
1548    rpcPreCheck("compactionSwitch");
1549    final CompactSplit compactSplitThread = server.getCompactSplitThread();
1550    requestCount.increment();
1551    boolean prevState = compactSplitThread.isCompactionsEnabled();
1552    CompactionSwitchResponse response =
1553      CompactionSwitchResponse.newBuilder().setPrevState(prevState).build();
1554    if (prevState == request.getEnabled()) {
1555      // passed in requested state is same as current state. No action required
1556      return response;
1557    }
1558    compactSplitThread.switchCompaction(request.getEnabled());
1559    return response;
1560  }
1561
1562  /**
1563   * Flush a region on the region server.
1564   * @param controller the RPC controller
1565   * @param request    the request
1566   */
1567  @Override
1568  @QosPriority(priority = HConstants.ADMIN_QOS)
1569  public FlushRegionResponse flushRegion(final RpcController controller,
1570    final FlushRegionRequest request) throws ServiceException {
1571    try {
1572      checkOpen();
1573      requestCount.increment();
1574      HRegion region = getRegion(request.getRegion());
1575      LOG.info("Flushing " + region.getRegionInfo().getRegionNameAsString());
1576      boolean shouldFlush = true;
1577      if (request.hasIfOlderThanTs()) {
1578        shouldFlush = region.getEarliestFlushTimeForAllStores() < request.getIfOlderThanTs();
1579      }
1580      FlushRegionResponse.Builder builder = FlushRegionResponse.newBuilder();
1581      if (shouldFlush) {
1582        boolean writeFlushWalMarker =
1583          request.hasWriteFlushWalMarker() ? request.getWriteFlushWalMarker() : false;
1584        // Go behind the curtain so we can manage writing of the flush WAL marker
1585        HRegion.FlushResultImpl flushResult = null;
1586        if (request.hasFamily()) {
1587          List families = new ArrayList();
1588          families.add(request.getFamily().toByteArray());
1589          flushResult =
1590            region.flushcache(families, writeFlushWalMarker, FlushLifeCycleTracker.DUMMY);
1591        } else {
1592          flushResult = region.flushcache(true, writeFlushWalMarker, FlushLifeCycleTracker.DUMMY);
1593        }
1594        boolean compactionNeeded = flushResult.isCompactionNeeded();
1595        if (compactionNeeded) {
1596          server.getCompactSplitThread().requestSystemCompaction(region,
1597            "Compaction through user triggered flush");
1598        }
1599        builder.setFlushed(flushResult.isFlushSucceeded());
1600        builder.setWroteFlushWalMarker(flushResult.wroteFlushWalMarker);
1601      }
1602      builder.setLastFlushTime(region.getEarliestFlushTimeForAllStores());
1603      return builder.build();
1604    } catch (DroppedSnapshotException ex) {
1605      // Cache flush can fail in a few places. If it fails in a critical
1606      // section, we get a DroppedSnapshotException and a replay of wal
1607      // is required. Currently the only way to do this is a restart of
1608      // the server.
1609      server.abort("Replay of WAL required. Forcing server shutdown", ex);
1610      throw new ServiceException(ex);
1611    } catch (IOException ie) {
1612      throw new ServiceException(ie);
1613    }
1614  }
1615
1616  @Override
1617  @QosPriority(priority = HConstants.ADMIN_QOS)
1618  public GetOnlineRegionResponse getOnlineRegion(final RpcController controller,
1619    final GetOnlineRegionRequest request) throws ServiceException {
1620    try {
1621      checkOpen();
1622      requestCount.increment();
1623      Map<String, HRegion> onlineRegions = server.getOnlineRegions();
1624      List<RegionInfo> list = new ArrayList<>(onlineRegions.size());
1625      for (HRegion region : onlineRegions.values()) {
1626        list.add(region.getRegionInfo());
1627      }
1628      list.sort(RegionInfo.COMPARATOR);
1629      return ResponseConverter.buildGetOnlineRegionResponse(list);
1630    } catch (IOException ie) {
1631      throw new ServiceException(ie);
1632    }
1633  }
1634
1635  // Master implementation of this Admin Service differs given it is not
1636  // able to supply detail only known to RegionServer. See note on
1637  // MasterRpcServers#getRegionInfo.
1638  @Override
1639  @QosPriority(priority = HConstants.ADMIN_QOS)
1640  public GetRegionInfoResponse getRegionInfo(final RpcController controller,
1641    final GetRegionInfoRequest request) throws ServiceException {
1642    try {
1643      checkOpen();
1644      requestCount.increment();
1645      HRegion region = getRegion(request.getRegion());
1646      RegionInfo info = region.getRegionInfo();
1647      byte[] bestSplitRow;
1648      if (request.hasBestSplitRow() && request.getBestSplitRow()) {
1649        bestSplitRow = region.checkSplit(true).orElse(null);
1650        // when all table data are in memstore, bestSplitRow = null
1651        // try to flush region first
1652        if (bestSplitRow == null) {
1653          region.flush(true);
1654          bestSplitRow = region.checkSplit(true).orElse(null);
1655        }
1656      } else {
1657        bestSplitRow = null;
1658      }
1659      GetRegionInfoResponse.Builder builder = GetRegionInfoResponse.newBuilder();
1660      builder.setRegionInfo(ProtobufUtil.toRegionInfo(info));
1661      if (request.hasCompactionState() && request.getCompactionState()) {
1662        builder.setCompactionState(ProtobufUtil.createCompactionState(region.getCompactionState()));
1663      }
1664      builder.setSplittable(region.isSplittable());
1665      builder.setMergeable(region.isMergeable());
1666      if (request.hasBestSplitRow() && request.getBestSplitRow() && bestSplitRow != null) {
1667        builder.setBestSplitRow(UnsafeByteOperations.unsafeWrap(bestSplitRow));
1668      }
1669      return builder.build();
1670    } catch (IOException ie) {
1671      throw new ServiceException(ie);
1672    }
1673  }
1674
1675  @Override
1676  @QosPriority(priority = HConstants.ADMIN_QOS)
1677  public GetRegionLoadResponse getRegionLoad(RpcController controller, GetRegionLoadRequest request)
1678    throws ServiceException {
1679
1680    List<HRegion> regions;
1681    if (request.hasTableName()) {
1682      TableName tableName = ProtobufUtil.toTableName(request.getTableName());
1683      regions = server.getRegions(tableName);
1684    } else {
1685      regions = server.getRegions();
1686    }
1687    List<RegionLoad> rLoads = new ArrayList<>(regions.size());
1688    RegionLoad.Builder regionLoadBuilder = ClusterStatusProtos.RegionLoad.newBuilder();
1689    RegionSpecifier.Builder regionSpecifier = RegionSpecifier.newBuilder();
1690
1691    try {
1692      for (HRegion region : regions) {
1693        rLoads.add(server.createRegionLoad(region, regionLoadBuilder, regionSpecifier));
1694      }
1695    } catch (IOException e) {
1696      throw new ServiceException(e);
1697    }
1698    GetRegionLoadResponse.Builder builder = GetRegionLoadResponse.newBuilder();
1699    builder.addAllRegionLoads(rLoads);
1700    return builder.build();
1701  }
1702
1703  @Override
1704  @QosPriority(priority = HConstants.ADMIN_QOS)
1705  public ClearCompactionQueuesResponse clearCompactionQueues(RpcController controller,
1706    ClearCompactionQueuesRequest request) throws ServiceException {
1707    LOG.debug("Client=" + RpcServer.getRequestUserName().orElse(null) + "/"
1708      + RpcServer.getRemoteAddress().orElse(null) + " clear compactions queue");
1709    ClearCompactionQueuesResponse.Builder respBuilder = ClearCompactionQueuesResponse.newBuilder();
1710    requestCount.increment();
1711    if (clearCompactionQueues.compareAndSet(false, true)) {
1712      final CompactSplit compactSplitThread = server.getCompactSplitThread();
1713      try {
1714        checkOpen();
1715        server.getRegionServerCoprocessorHost().preClearCompactionQueues();
1716        for (String queueName : request.getQueueNameList()) {
1717          LOG.debug("clear " + queueName + " compaction queue");
1718          switch (queueName) {
1719            case "long":
1720              compactSplitThread.clearLongCompactionsQueue();
1721              break;
1722            case "short":
1723              compactSplitThread.clearShortCompactionsQueue();
1724              break;
1725            default:
1726              LOG.warn("Unknown queue name " + queueName);
1727              throw new IOException("Unknown queue name " + queueName);
1728          }
1729        }
1730        server.getRegionServerCoprocessorHost().postClearCompactionQueues();
1731      } catch (IOException ie) {
1732        throw new ServiceException(ie);
1733      } finally {
1734        clearCompactionQueues.set(false);
1735      }
1736    } else {
1737      LOG.warn("Clear compactions queue is executing by other admin.");
1738    }
1739    return respBuilder.build();
1740  }
1741
1742  /**
1743   * Get some information of the region server.
1744   * @param controller the RPC controller
1745   * @param request    the request
1746   */
1747  @Override
1748  @QosPriority(priority = HConstants.ADMIN_QOS)
1749  public GetServerInfoResponse getServerInfo(final RpcController controller,
1750    final GetServerInfoRequest request) throws ServiceException {
1751    try {
1752      checkOpen();
1753    } catch (IOException ie) {
1754      throw new ServiceException(ie);
1755    }
1756    requestCount.increment();
1757    int infoPort = server.getInfoServer() != null ? server.getInfoServer().getPort() : -1;
1758    return ResponseConverter.buildGetServerInfoResponse(server.getServerName(), infoPort);
1759  }
1760
1761  @Override
1762  @QosPriority(priority = HConstants.ADMIN_QOS)
1763  public GetStoreFileResponse getStoreFile(final RpcController controller,
1764    final GetStoreFileRequest request) throws ServiceException {
1765    try {
1766      checkOpen();
1767      HRegion region = getRegion(request.getRegion());
1768      requestCount.increment();
1769      Set<byte[]> columnFamilies;
1770      if (request.getFamilyCount() == 0) {
1771        columnFamilies = region.getTableDescriptor().getColumnFamilyNames();
1772      } else {
1773        columnFamilies = new TreeSet<>(Bytes.BYTES_RAWCOMPARATOR);
1774        for (ByteString cf : request.getFamilyList()) {
1775          columnFamilies.add(cf.toByteArray());
1776        }
1777      }
1778      int nCF = columnFamilies.size();
1779      List<String> fileList = region.getStoreFileList(columnFamilies.toArray(new byte[nCF][]));
1780      GetStoreFileResponse.Builder builder = GetStoreFileResponse.newBuilder();
1781      builder.addAllStoreFile(fileList);
1782      return builder.build();
1783    } catch (IOException ie) {
1784      throw new ServiceException(ie);
1785    }
1786  }
1787
1788  private void throwOnWrongStartCode(OpenRegionRequest request) throws ServiceException {
1789    if (!request.hasServerStartCode()) {
1790      LOG.warn("OpenRegionRequest for {} does not have a start code", request.getOpenInfoList());
1791      return;
1792    }
1793    throwOnWrongStartCode(request.getServerStartCode());
1794  }
1795
1796  private void throwOnWrongStartCode(CloseRegionRequest request) throws ServiceException {
1797    if (!request.hasServerStartCode()) {
1798      LOG.warn("CloseRegionRequest for {} does not have a start code", request.getRegion());
1799      return;
1800    }
1801    throwOnWrongStartCode(request.getServerStartCode());
1802  }
1803
1804  private void throwOnWrongStartCode(long serverStartCode) throws ServiceException {
1805    // check that we are the same server that this RPC is intended for.
1806    if (server.getServerName().getStartcode() != serverStartCode) {
1807      throw new ServiceException(new DoNotRetryIOException(
1808        "This RPC was intended for a " + "different server with startCode: " + serverStartCode
1809          + ", this server is: " + server.getServerName()));
1810    }
1811  }
1812
1813  private void throwOnWrongStartCode(ExecuteProceduresRequest req) throws ServiceException {
1814    if (req.getOpenRegionCount() > 0) {
1815      for (OpenRegionRequest openReq : req.getOpenRegionList()) {
1816        throwOnWrongStartCode(openReq);
1817      }
1818    }
1819    if (req.getCloseRegionCount() > 0) {
1820      for (CloseRegionRequest closeReq : req.getCloseRegionList()) {
1821        throwOnWrongStartCode(closeReq);
1822      }
1823    }
1824  }
1825
1826  /**
1827   * Open asynchronously a region or a set of regions on the region server. The opening is
1828   * coordinated by ZooKeeper, and this method requires the znode to be created before being called.
1829   * As a consequence, this method should be called only from the master.
1830   * <p>
1831   * Different manages states for the region are:
1832   * </p>
1833   * <ul>
1834   * <li>region not opened: the region opening will start asynchronously.</li>
1835   * <li>a close is already in progress: this is considered as an error.</li>
1836   * <li>an open is already in progress: this new open request will be ignored. This is important
1837   * because the Master can do multiple requests if it crashes.</li>
1838   * <li>the region is already opened: this new open request will be ignored.</li>
1839   * </ul>
1840   * <p>
1841   * Bulk assign: If there are more than 1 region to open, it will be considered as a bulk assign.
1842   * For a single region opening, errors are sent through a ServiceException. For bulk assign,
1843   * errors are put in the response as FAILED_OPENING.
1844   * </p>
1845   * @param controller the RPC controller
1846   * @param request    the request
1847   */
1848  @Override
1849  @QosPriority(priority = HConstants.ADMIN_QOS)
1850  public OpenRegionResponse openRegion(final RpcController controller,
1851    final OpenRegionRequest request) throws ServiceException {
1852    requestCount.increment();
1853    throwOnWrongStartCode(request);
1854
1855    OpenRegionResponse.Builder builder = OpenRegionResponse.newBuilder();
1856    final int regionCount = request.getOpenInfoCount();
1857    final Map<TableName, TableDescriptor> htds = new HashMap<>(regionCount);
1858    final boolean isBulkAssign = regionCount > 1;
1859    try {
1860      checkOpen();
1861    } catch (IOException ie) {
1862      TableName tableName = null;
1863      if (regionCount == 1) {
1864        org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionInfo ri =
1865          request.getOpenInfo(0).getRegion();
1866        if (ri != null) {
1867          tableName = ProtobufUtil.toTableName(ri.getTableName());
1868        }
1869      }
1870      if (!TableName.META_TABLE_NAME.equals(tableName)) {
1871        throw new ServiceException(ie);
1872      }
1873      // We are assigning meta, wait a little for regionserver to finish initialization.
1874      // Default to quarter of RPC timeout
1875      int timeout = server.getConfiguration().getInt(HConstants.HBASE_RPC_TIMEOUT_KEY,
1876        HConstants.DEFAULT_HBASE_RPC_TIMEOUT) >> 2;
1877      long endTime = EnvironmentEdgeManager.currentTime() + timeout;
1878      synchronized (server.online) {
1879        try {
1880          while (
1881            EnvironmentEdgeManager.currentTime() <= endTime && !server.isStopped()
1882              && !server.isOnline()
1883          ) {
1884            server.online.wait(server.getMsgInterval());
1885          }
1886          checkOpen();
1887        } catch (InterruptedException t) {
1888          Thread.currentThread().interrupt();
1889          throw new ServiceException(t);
1890        } catch (IOException e) {
1891          throw new ServiceException(e);
1892        }
1893      }
1894    }
1895
1896    long masterSystemTime = request.hasMasterSystemTime() ? request.getMasterSystemTime() : -1;
1897
1898    for (RegionOpenInfo regionOpenInfo : request.getOpenInfoList()) {
1899      final RegionInfo region = ProtobufUtil.toRegionInfo(regionOpenInfo.getRegion());
1900      TableDescriptor htd;
1901      try {
1902        String encodedName = region.getEncodedName();
1903        byte[] encodedNameBytes = region.getEncodedNameAsBytes();
1904        final HRegion onlineRegion = server.getRegion(encodedName);
1905        if (onlineRegion != null) {
1906          // The region is already online. This should not happen any more.
1907          String error = "Received OPEN for the region:" + region.getRegionNameAsString()
1908            + ", which is already online";
1909          LOG.warn(error);
1910          // server.abort(error);
1911          // throw new IOException(error);
1912          builder.addOpeningState(RegionOpeningState.OPENED);
1913          continue;
1914        }
1915        LOG.info("Open " + region.getRegionNameAsString());
1916
1917        final Boolean previous =
1918          server.getRegionsInTransitionInRS().putIfAbsent(encodedNameBytes, Boolean.TRUE);
1919
1920        if (Boolean.FALSE.equals(previous)) {
1921          if (server.getRegion(encodedName) != null) {
1922            // There is a close in progress. This should not happen any more.
1923            String error = "Received OPEN for the region:" + region.getRegionNameAsString()
1924              + ", which we are already trying to CLOSE";
1925            server.abort(error);
1926            throw new IOException(error);
1927          }
1928          server.getRegionsInTransitionInRS().put(encodedNameBytes, Boolean.TRUE);
1929        }
1930
1931        if (Boolean.TRUE.equals(previous)) {
1932          // An open is in progress. This is supported, but let's log this.
1933          LOG.info("Receiving OPEN for the region:" + region.getRegionNameAsString()
1934            + ", which we are already trying to OPEN"
1935            + " - ignoring this new request for this region.");
1936        }
1937
1938        // We are opening this region. If it moves back and forth for whatever reason, we don't
1939        // want to keep returning the stale moved record while we are opening/if we close again.
1940        server.removeFromMovedRegions(region.getEncodedName());
1941
1942        if (previous == null || !previous.booleanValue()) {
1943          htd = htds.get(region.getTable());
1944          if (htd == null) {
1945            htd = server.getTableDescriptors().get(region.getTable());
1946            htds.put(region.getTable(), htd);
1947          }
1948          if (htd == null) {
1949            throw new IOException("Missing table descriptor for " + region.getEncodedName());
1950          }
1951          // If there is no action in progress, we can submit a specific handler.
1952          // Need to pass the expected version in the constructor.
1953          if (server.getExecutorService() == null) {
1954            LOG.info("No executor executorService; skipping open request");
1955          } else {
1956            if (region.isMetaRegion()) {
1957              server.getExecutorService()
1958                .submit(new OpenMetaHandler(server, server, region, htd, masterSystemTime));
1959            } else {
1960              if (regionOpenInfo.getFavoredNodesCount() > 0) {
1961                server.updateRegionFavoredNodesMapping(region.getEncodedName(),
1962                  regionOpenInfo.getFavoredNodesList());
1963              }
1964              if (htd.getPriority() >= HConstants.ADMIN_QOS || region.getTable().isSystemTable()) {
1965                server.getExecutorService().submit(
1966                  new OpenPriorityRegionHandler(server, server, region, htd, masterSystemTime));
1967              } else {
1968                server.getExecutorService()
1969                  .submit(new OpenRegionHandler(server, server, region, htd, masterSystemTime));
1970              }
1971            }
1972          }
1973        }
1974
1975        builder.addOpeningState(RegionOpeningState.OPENED);
1976      } catch (IOException ie) {
1977        LOG.warn("Failed opening region " + region.getRegionNameAsString(), ie);
1978        if (isBulkAssign) {
1979          builder.addOpeningState(RegionOpeningState.FAILED_OPENING);
1980        } else {
1981          throw new ServiceException(ie);
1982        }
1983      }
1984    }
1985    return builder.build();
1986  }
1987
1988  /**
1989   * Warmup a region on this server. This method should only be called by Master. It synchronously
1990   * opens the region and closes the region bringing the most important pages in cache.
1991   */
1992  @Override
1993  public WarmupRegionResponse warmupRegion(final RpcController controller,
1994    final WarmupRegionRequest request) throws ServiceException {
1995    final RegionInfo region = ProtobufUtil.toRegionInfo(request.getRegionInfo());
1996    WarmupRegionResponse response = WarmupRegionResponse.getDefaultInstance();
1997    try {
1998      checkOpen();
1999      String encodedName = region.getEncodedName();
2000      byte[] encodedNameBytes = region.getEncodedNameAsBytes();
2001      final HRegion onlineRegion = server.getRegion(encodedName);
2002      if (onlineRegion != null) {
2003        LOG.info("{} is online; skipping warmup", region);
2004        return response;
2005      }
2006      TableDescriptor htd = server.getTableDescriptors().get(region.getTable());
2007      if (server.getRegionsInTransitionInRS().containsKey(encodedNameBytes)) {
2008        LOG.info("{} is in transition; skipping warmup", region);
2009        return response;
2010      }
2011      LOG.info("Warmup {}", region.getRegionNameAsString());
2012      HRegion.warmupHRegion(region, htd, server.getWAL(region), server.getConfiguration(), server,
2013        null);
2014    } catch (IOException ie) {
2015      LOG.error("Failed warmup of {}", region.getRegionNameAsString(), ie);
2016      throw new ServiceException(ie);
2017    }
2018
2019    return response;
2020  }
2021
2022  private CellScanner getAndReset(RpcController controller) {
2023    HBaseRpcController hrc = (HBaseRpcController) controller;
2024    CellScanner cells = hrc.cellScanner();
2025    hrc.setCellScanner(null);
2026    return cells;
2027  }
2028
2029  /**
2030   * Replay the given changes when distributedLogReplay WAL edits from a failed RS. The guarantee is
2031   * that the given mutations will be durable on the receiving RS if this method returns without any
2032   * exception.
2033   * @param controller the RPC controller
2034   * @param request    the request
2035   * @deprecated Since 3.0.0, will be removed in 4.0.0. Not used any more, put here only for
2036   *             compatibility with old region replica implementation. Now we will use
2037   *             {@code replicateToReplica} method instead.
2038   */
2039  @Deprecated
2040  @Override
2041  @QosPriority(priority = HConstants.REPLAY_QOS)
2042  public ReplicateWALEntryResponse replay(final RpcController controller,
2043    final ReplicateWALEntryRequest request) throws ServiceException {
2044    long before = EnvironmentEdgeManager.currentTime();
2045    CellScanner cells = getAndReset(controller);
2046    try {
2047      checkOpen();
2048      List<WALEntry> entries = request.getEntryList();
2049      if (entries == null || entries.isEmpty()) {
2050        // empty input
2051        return ReplicateWALEntryResponse.newBuilder().build();
2052      }
2053      ByteString regionName = entries.get(0).getKey().getEncodedRegionName();
2054      HRegion region = server.getRegionByEncodedName(regionName.toStringUtf8());
2055      RegionCoprocessorHost coprocessorHost =
2056        ServerRegionReplicaUtil.isDefaultReplica(region.getRegionInfo())
2057          ? region.getCoprocessorHost()
2058          : null; // do not invoke coprocessors if this is a secondary region replica
2059      List<Pair<WALKey, WALEdit>> walEntries = new ArrayList<>();
2060
2061      // Skip adding the edits to WAL if this is a secondary region replica
2062      boolean isPrimary = RegionReplicaUtil.isDefaultReplica(region.getRegionInfo());
2063      Durability durability = isPrimary ? Durability.USE_DEFAULT : Durability.SKIP_WAL;
2064
2065      for (WALEntry entry : entries) {
2066        if (!regionName.equals(entry.getKey().getEncodedRegionName())) {
2067          throw new NotServingRegionException("Replay request contains entries from multiple "
2068            + "regions. First region:" + regionName.toStringUtf8() + " , other region:"
2069            + entry.getKey().getEncodedRegionName());
2070        }
2071        if (server.nonceManager != null && isPrimary) {
2072          long nonceGroup =
2073            entry.getKey().hasNonceGroup() ? entry.getKey().getNonceGroup() : HConstants.NO_NONCE;
2074          long nonce = entry.getKey().hasNonce() ? entry.getKey().getNonce() : HConstants.NO_NONCE;
2075          server.nonceManager.reportOperationFromWal(nonceGroup, nonce,
2076            entry.getKey().getWriteTime());
2077        }
2078        Pair<WALKey, WALEdit> walEntry = (coprocessorHost == null) ? null : new Pair<>();
2079        List<MutationReplay> edits =
2080          WALSplitUtil.getMutationsFromWALEntry(entry, cells, walEntry, durability);
2081        if (coprocessorHost != null) {
2082          // Start coprocessor replay here. The coprocessor is for each WALEdit instead of a
2083          // KeyValue.
2084          if (
2085            coprocessorHost.preWALRestore(region.getRegionInfo(), walEntry.getFirst(),
2086              walEntry.getSecond())
2087          ) {
2088            // if bypass this log entry, ignore it ...
2089            continue;
2090          }
2091          walEntries.add(walEntry);
2092        }
2093        if (edits != null && !edits.isEmpty()) {
2094          // HBASE-17924
2095          // sort to improve lock efficiency
2096          Collections.sort(edits, (v1, v2) -> Row.COMPARATOR.compare(v1.mutation, v2.mutation));
2097          long replaySeqId = (entry.getKey().hasOrigSequenceNumber())
2098            ? entry.getKey().getOrigSequenceNumber()
2099            : entry.getKey().getLogSequenceNumber();
2100          OperationStatus[] result = doReplayBatchOp(region, edits, replaySeqId);
2101          // check if it's a partial success
2102          for (int i = 0; result != null && i < result.length; i++) {
2103            if (result[i] != OperationStatus.SUCCESS) {
2104              throw new IOException(result[i].getExceptionMsg());
2105            }
2106          }
2107        }
2108      }
2109
2110      // sync wal at the end because ASYNC_WAL is used above
2111      WAL wal = region.getWAL();
2112      if (wal != null) {
2113        wal.sync();
2114      }
2115
2116      if (coprocessorHost != null) {
2117        for (Pair<WALKey, WALEdit> entry : walEntries) {
2118          coprocessorHost.postWALRestore(region.getRegionInfo(), entry.getFirst(),
2119            entry.getSecond());
2120        }
2121      }
2122      return ReplicateWALEntryResponse.newBuilder().build();
2123    } catch (IOException ie) {
2124      throw new ServiceException(ie);
2125    } finally {
2126      final MetricsRegionServer metricsRegionServer = server.getMetrics();
2127      if (metricsRegionServer != null) {
2128        metricsRegionServer.updateReplay(EnvironmentEdgeManager.currentTime() - before);
2129      }
2130    }
2131  }
2132
2133  /**
2134   * Replay the given changes on a secondary replica
2135   */
2136  @Override
2137  public ReplicateWALEntryResponse replicateToReplica(RpcController controller,
2138    ReplicateWALEntryRequest request) throws ServiceException {
2139    CellScanner cells = getAndReset(controller);
2140    try {
2141      checkOpen();
2142      List<WALEntry> entries = request.getEntryList();
2143      if (entries == null || entries.isEmpty()) {
2144        // empty input
2145        return ReplicateWALEntryResponse.newBuilder().build();
2146      }
2147      ByteString regionName = entries.get(0).getKey().getEncodedRegionName();
2148      HRegion region = server.getRegionByEncodedName(regionName.toStringUtf8());
2149      if (RegionReplicaUtil.isDefaultReplica(region.getRegionInfo())) {
2150        throw new DoNotRetryIOException(
2151          "Should not replicate to primary replica " + region.getRegionInfo() + ", CODE BUG?");
2152      }
2153      for (WALEntry entry : entries) {
2154        if (!regionName.equals(entry.getKey().getEncodedRegionName())) {
2155          throw new NotServingRegionException(
2156            "ReplicateToReplica request contains entries from multiple " + "regions. First region:"
2157              + regionName.toStringUtf8() + " , other region:"
2158              + entry.getKey().getEncodedRegionName());
2159        }
2160        region.replayWALEntry(entry, cells);
2161      }
2162      return ReplicateWALEntryResponse.newBuilder().build();
2163    } catch (IOException ie) {
2164      throw new ServiceException(ie);
2165    }
2166  }
2167
2168  private void checkShouldRejectReplicationRequest(List<WALEntry> entries) throws IOException {
2169    ReplicationSourceService replicationSource = server.getReplicationSourceService();
2170    if (replicationSource == null || entries.isEmpty()) {
2171      return;
2172    }
2173    // We can ensure that all entries are for one peer, so only need to check one entry's
2174    // table name. if the table hit sync replication at peer side and the peer cluster
2175    // is (or is transiting to) state ACTIVE or DOWNGRADE_ACTIVE, we should reject to apply
2176    // those entries according to the design doc.
2177    TableName table = TableName.valueOf(entries.get(0).getKey().getTableName().toByteArray());
2178    if (
2179      replicationSource.getSyncReplicationPeerInfoProvider().checkState(table,
2180        RejectReplicationRequestStateChecker.get())
2181    ) {
2182      throw new DoNotRetryIOException(
2183        "Reject to apply to sink cluster because sync replication state of sink cluster "
2184          + "is ACTIVE or DOWNGRADE_ACTIVE, table: " + table);
2185    }
2186  }
2187
2188  /**
2189   * Replicate WAL entries on the region server.
2190   * @param controller the RPC controller
2191   * @param request    the request
2192   */
2193  @Override
2194  @QosPriority(priority = HConstants.REPLICATION_QOS)
2195  public ReplicateWALEntryResponse replicateWALEntry(final RpcController controller,
2196    final ReplicateWALEntryRequest request) throws ServiceException {
2197    try {
2198      checkOpen();
2199      if (server.getReplicationSinkService() != null) {
2200        requestCount.increment();
2201        List<WALEntry> entries = request.getEntryList();
2202        checkShouldRejectReplicationRequest(entries);
2203        CellScanner cellScanner = getAndReset(controller);
2204        server.getRegionServerCoprocessorHost().preReplicateLogEntries();
2205        server.getReplicationSinkService().replicateLogEntries(entries, cellScanner,
2206          request.getReplicationClusterId(), request.getSourceBaseNamespaceDirPath(),
2207          request.getSourceHFileArchiveDirPath());
2208        server.getRegionServerCoprocessorHost().postReplicateLogEntries();
2209        return ReplicateWALEntryResponse.newBuilder().build();
2210      } else {
2211        throw new ServiceException("Replication services are not initialized yet");
2212      }
2213    } catch (IOException ie) {
2214      throw new ServiceException(ie);
2215    }
2216  }
2217
2218  /**
2219   * Roll the WAL writer of the region server.
2220   * @param controller the RPC controller
2221   * @param request    the request
2222   */
2223  @Override
2224  public RollWALWriterResponse rollWALWriter(final RpcController controller,
2225    final RollWALWriterRequest request) throws ServiceException {
2226    try {
2227      checkOpen();
2228      requestCount.increment();
2229      server.getRegionServerCoprocessorHost().preRollWALWriterRequest();
2230      server.getWalRoller().requestRollAll();
2231      server.getRegionServerCoprocessorHost().postRollWALWriterRequest();
2232      RollWALWriterResponse.Builder builder = RollWALWriterResponse.newBuilder();
2233      return builder.build();
2234    } catch (IOException ie) {
2235      throw new ServiceException(ie);
2236    }
2237  }
2238
2239  /**
2240   * Stop the region server.
2241   * @param controller the RPC controller
2242   * @param request    the request
2243   */
2244  @Override
2245  @QosPriority(priority = HConstants.ADMIN_QOS)
2246  public StopServerResponse stopServer(final RpcController controller,
2247    final StopServerRequest request) throws ServiceException {
2248    rpcPreCheck("stopServer");
2249    requestCount.increment();
2250    String reason = request.getReason();
2251    server.stop(reason);
2252    return StopServerResponse.newBuilder().build();
2253  }
2254
2255  @Override
2256  public UpdateFavoredNodesResponse updateFavoredNodes(RpcController controller,
2257    UpdateFavoredNodesRequest request) throws ServiceException {
2258    rpcPreCheck("updateFavoredNodes");
2259    List<UpdateFavoredNodesRequest.RegionUpdateInfo> openInfoList = request.getUpdateInfoList();
2260    UpdateFavoredNodesResponse.Builder respBuilder = UpdateFavoredNodesResponse.newBuilder();
2261    for (UpdateFavoredNodesRequest.RegionUpdateInfo regionUpdateInfo : openInfoList) {
2262      RegionInfo hri = ProtobufUtil.toRegionInfo(regionUpdateInfo.getRegion());
2263      if (regionUpdateInfo.getFavoredNodesCount() > 0) {
2264        server.updateRegionFavoredNodesMapping(hri.getEncodedName(),
2265          regionUpdateInfo.getFavoredNodesList());
2266      }
2267    }
2268    respBuilder.setResponse(openInfoList.size());
2269    return respBuilder.build();
2270  }
2271
2272  /**
2273   * Atomically bulk load several HFiles into an open region
2274   * @return true if successful, false is failed but recoverably (no action)
2275   * @throws ServiceException if failed unrecoverably
2276   */
2277  @Override
2278  public BulkLoadHFileResponse bulkLoadHFile(final RpcController controller,
2279    final BulkLoadHFileRequest request) throws ServiceException {
2280    long start = EnvironmentEdgeManager.currentTime();
2281    List<String> clusterIds = new ArrayList<>(request.getClusterIdsList());
2282    if (clusterIds.contains(this.server.getClusterId())) {
2283      return BulkLoadHFileResponse.newBuilder().setLoaded(true).build();
2284    } else {
2285      clusterIds.add(this.server.getClusterId());
2286    }
2287    try {
2288      checkOpen();
2289      requestCount.increment();
2290      HRegion region = getRegion(request.getRegion());
2291      final boolean spaceQuotaEnabled = QuotaUtil.isQuotaEnabled(getConfiguration());
2292      long sizeToBeLoaded = -1;
2293
2294      // Check to see if this bulk load would exceed the space quota for this table
2295      if (spaceQuotaEnabled) {
2296        ActivePolicyEnforcement activeSpaceQuotas = getSpaceQuotaManager().getActiveEnforcements();
2297        SpaceViolationPolicyEnforcement enforcement =
2298          activeSpaceQuotas.getPolicyEnforcement(region);
2299        if (enforcement != null) {
2300          // Bulk loads must still be atomic. We must enact all or none.
2301          List<String> filePaths = new ArrayList<>(request.getFamilyPathCount());
2302          for (FamilyPath familyPath : request.getFamilyPathList()) {
2303            filePaths.add(familyPath.getPath());
2304          }
2305          // Check if the batch of files exceeds the current quota
2306          sizeToBeLoaded = enforcement.computeBulkLoadSize(getFileSystem(filePaths), filePaths);
2307        }
2308      }
2309      // secure bulk load
2310      Map<byte[], List<Path>> map =
2311        server.getSecureBulkLoadManager().secureBulkLoadHFiles(region, request, clusterIds);
2312      BulkLoadHFileResponse.Builder builder = BulkLoadHFileResponse.newBuilder();
2313      builder.setLoaded(map != null);
2314      if (map != null) {
2315        // Treat any negative size as a flag to "ignore" updating the region size as that is
2316        // not possible to occur in real life (cannot bulk load a file with negative size)
2317        if (spaceQuotaEnabled && sizeToBeLoaded > 0) {
2318          if (LOG.isTraceEnabled()) {
2319            LOG.trace("Incrementing space use of " + region.getRegionInfo() + " by "
2320              + sizeToBeLoaded + " bytes");
2321          }
2322          // Inform space quotas of the new files for this region
2323          getSpaceQuotaManager().getRegionSizeStore().incrementRegionSize(region.getRegionInfo(),
2324            sizeToBeLoaded);
2325        }
2326      }
2327      return builder.build();
2328    } catch (IOException ie) {
2329      throw new ServiceException(ie);
2330    } finally {
2331      final MetricsRegionServer metricsRegionServer = server.getMetrics();
2332      if (metricsRegionServer != null) {
2333        metricsRegionServer.updateBulkLoad(EnvironmentEdgeManager.currentTime() - start);
2334      }
2335    }
2336  }
2337
2338  @Override
2339  public PrepareBulkLoadResponse prepareBulkLoad(RpcController controller,
2340    PrepareBulkLoadRequest request) throws ServiceException {
2341    try {
2342      checkOpen();
2343      requestCount.increment();
2344
2345      HRegion region = getRegion(request.getRegion());
2346
2347      String bulkToken = server.getSecureBulkLoadManager().prepareBulkLoad(region, request);
2348      PrepareBulkLoadResponse.Builder builder = PrepareBulkLoadResponse.newBuilder();
2349      builder.setBulkToken(bulkToken);
2350      return builder.build();
2351    } catch (IOException ie) {
2352      throw new ServiceException(ie);
2353    }
2354  }
2355
2356  @Override
2357  public CleanupBulkLoadResponse cleanupBulkLoad(RpcController controller,
2358    CleanupBulkLoadRequest request) throws ServiceException {
2359    try {
2360      checkOpen();
2361      requestCount.increment();
2362
2363      HRegion region = getRegion(request.getRegion());
2364
2365      server.getSecureBulkLoadManager().cleanupBulkLoad(region, request);
2366      return CleanupBulkLoadResponse.newBuilder().build();
2367    } catch (IOException ie) {
2368      throw new ServiceException(ie);
2369    }
2370  }
2371
2372  @Override
2373  public CoprocessorServiceResponse execService(final RpcController controller,
2374    final CoprocessorServiceRequest request) throws ServiceException {
2375    try {
2376      checkOpen();
2377      requestCount.increment();
2378      HRegion region = getRegion(request.getRegion());
2379      Message result = execServiceOnRegion(region, request.getCall());
2380      CoprocessorServiceResponse.Builder builder = CoprocessorServiceResponse.newBuilder();
2381      builder.setRegion(RequestConverter.buildRegionSpecifier(RegionSpecifierType.REGION_NAME,
2382        region.getRegionInfo().getRegionName()));
2383      // TODO: COPIES!!!!!!
2384      builder.setValue(builder.getValueBuilder().setName(result.getClass().getName()).setValue(
2385        org.apache.hbase.thirdparty.com.google.protobuf.ByteString.copyFrom(result.toByteArray())));
2386      return builder.build();
2387    } catch (IOException ie) {
2388      throw new ServiceException(ie);
2389    }
2390  }
2391
2392  private FileSystem getFileSystem(List<String> filePaths) throws IOException {
2393    if (filePaths.isEmpty()) {
2394      // local hdfs
2395      return server.getFileSystem();
2396    }
2397    // source hdfs
2398    return new Path(filePaths.get(0)).getFileSystem(server.getConfiguration());
2399  }
2400
2401  private Message execServiceOnRegion(HRegion region,
2402    final ClientProtos.CoprocessorServiceCall serviceCall) throws IOException {
2403    // ignore the passed in controller (from the serialized call)
2404    ServerRpcController execController = new ServerRpcController();
2405    return region.execService(execController, serviceCall);
2406  }
2407
2408  private boolean shouldRejectRequestsFromClient(HRegion region) {
2409    TableName table = region.getRegionInfo().getTable();
2410    ReplicationSourceService service = server.getReplicationSourceService();
2411    return service != null && service.getSyncReplicationPeerInfoProvider().checkState(table,
2412      RejectRequestsFromClientStateChecker.get());
2413  }
2414
2415  private void rejectIfInStandByState(HRegion region) throws DoNotRetryIOException {
2416    if (shouldRejectRequestsFromClient(region)) {
2417      throw new DoNotRetryIOException(
2418        region.getRegionInfo().getRegionNameAsString() + " is in STANDBY state.");
2419    }
2420  }
2421
2422  /**
2423   * Get data from a table.
2424   * @param controller the RPC controller
2425   * @param request    the get request
2426   */
2427  @Override
2428  public GetResponse get(final RpcController controller, final GetRequest request)
2429    throws ServiceException {
2430    long before = EnvironmentEdgeManager.currentTime();
2431    OperationQuota quota = null;
2432    HRegion region = null;
2433    try {
2434      checkOpen();
2435      requestCount.increment();
2436      rpcGetRequestCount.increment();
2437      region = getRegion(request.getRegion());
2438      rejectIfInStandByState(region);
2439
2440      GetResponse.Builder builder = GetResponse.newBuilder();
2441      ClientProtos.Get get = request.getGet();
2442      // An asynchbase client, https://github.com/OpenTSDB/asynchbase, starts by trying to do
2443      // a get closest before. Throwing the UnknownProtocolException signals it that it needs
2444      // to switch and do hbase2 protocol (HBase servers do not tell clients what versions
2445      // they are; its a problem for non-native clients like asynchbase. HBASE-20225.
2446      if (get.hasClosestRowBefore() && get.getClosestRowBefore()) {
2447        throw new UnknownProtocolException("Is this a pre-hbase-1.0.0 or asynchbase client? "
2448          + "Client is invoking getClosestRowBefore removed in hbase-2.0.0 replaced by "
2449          + "reverse Scan.");
2450      }
2451      Boolean existence = null;
2452      Result r = null;
2453      RpcCallContext context = RpcServer.getCurrentCall().orElse(null);
2454      quota = getRpcQuotaManager().checkQuota(region, OperationQuota.OperationType.GET);
2455
2456      Get clientGet = ProtobufUtil.toGet(get);
2457      if (get.getExistenceOnly() && region.getCoprocessorHost() != null) {
2458        existence = region.getCoprocessorHost().preExists(clientGet);
2459      }
2460      if (existence == null) {
2461        if (context != null) {
2462          r = get(clientGet, (region), null, context);
2463        } else {
2464          // for test purpose
2465          r = region.get(clientGet);
2466        }
2467        if (get.getExistenceOnly()) {
2468          boolean exists = r.getExists();
2469          if (region.getCoprocessorHost() != null) {
2470            exists = region.getCoprocessorHost().postExists(clientGet, exists);
2471          }
2472          existence = exists;
2473        }
2474      }
2475      if (existence != null) {
2476        ClientProtos.Result pbr =
2477          ProtobufUtil.toResult(existence, region.getRegionInfo().getReplicaId() != 0);
2478        builder.setResult(pbr);
2479      } else if (r != null) {
2480        ClientProtos.Result pbr;
2481        if (
2482          isClientCellBlockSupport(context) && controller instanceof HBaseRpcController
2483            && VersionInfoUtil.hasMinimumVersion(context.getClientVersionInfo(), 1, 3)
2484        ) {
2485          pbr = ProtobufUtil.toResultNoData(r);
2486          ((HBaseRpcController) controller)
2487            .setCellScanner(CellUtil.createCellScanner(r.rawCells()));
2488          addSize(context, r);
2489        } else {
2490          pbr = ProtobufUtil.toResult(r);
2491        }
2492        builder.setResult(pbr);
2493      }
2494      // r.cells is null when an table.exists(get) call
2495      if (r != null && r.rawCells() != null) {
2496        quota.addGetResult(r);
2497      }
2498      return builder.build();
2499    } catch (IOException ie) {
2500      throw new ServiceException(ie);
2501    } finally {
2502      final MetricsRegionServer metricsRegionServer = server.getMetrics();
2503      if (metricsRegionServer != null) {
2504        TableDescriptor td = region != null ? region.getTableDescriptor() : null;
2505        if (td != null) {
2506          metricsRegionServer.updateGet(td.getTableName(),
2507            EnvironmentEdgeManager.currentTime() - before);
2508        }
2509      }
2510      if (quota != null) {
2511        quota.close();
2512      }
2513    }
2514  }
2515
2516  private Result get(Get get, HRegion region, RegionScannersCloseCallBack closeCallBack,
2517    RpcCallContext context) throws IOException {
2518    region.prepareGet(get);
2519    boolean stale = region.getRegionInfo().getReplicaId() != 0;
2520
2521    // This method is almost the same as HRegion#get.
2522    List<Cell> results = new ArrayList<>();
2523    long before = EnvironmentEdgeManager.currentTime();
2524    // pre-get CP hook
2525    if (region.getCoprocessorHost() != null) {
2526      if (region.getCoprocessorHost().preGet(get, results)) {
2527        region.metricsUpdateForGet(results, before);
2528        return Result.create(results, get.isCheckExistenceOnly() ? !results.isEmpty() : null,
2529          stale);
2530      }
2531    }
2532    Scan scan = new Scan(get);
2533    if (scan.getLoadColumnFamiliesOnDemandValue() == null) {
2534      scan.setLoadColumnFamiliesOnDemand(region.isLoadingCfsOnDemandDefault());
2535    }
2536    RegionScannerImpl scanner = null;
2537    try {
2538      scanner = region.getScanner(scan);
2539      scanner.next(results);
2540    } finally {
2541      if (scanner != null) {
2542        if (closeCallBack == null) {
2543          // If there is a context then the scanner can be added to the current
2544          // RpcCallContext. The rpc callback will take care of closing the
2545          // scanner, for eg in case
2546          // of get()
2547          context.setCallBack(scanner);
2548        } else {
2549          // The call is from multi() where the results from the get() are
2550          // aggregated and then send out to the
2551          // rpc. The rpccall back will close all such scanners created as part
2552          // of multi().
2553          closeCallBack.addScanner(scanner);
2554        }
2555      }
2556    }
2557
2558    // post-get CP hook
2559    if (region.getCoprocessorHost() != null) {
2560      region.getCoprocessorHost().postGet(get, results);
2561    }
2562    region.metricsUpdateForGet(results, before);
2563
2564    return Result.create(results, get.isCheckExistenceOnly() ? !results.isEmpty() : null, stale);
2565  }
2566
2567  private void checkBatchSizeAndLogLargeSize(MultiRequest request) throws ServiceException {
2568    int sum = 0;
2569    String firstRegionName = null;
2570    for (RegionAction regionAction : request.getRegionActionList()) {
2571      if (sum == 0) {
2572        firstRegionName = Bytes.toStringBinary(regionAction.getRegion().getValue().toByteArray());
2573      }
2574      sum += regionAction.getActionCount();
2575    }
2576    if (sum > rowSizeWarnThreshold) {
2577      LOG.warn("Large batch operation detected (greater than " + rowSizeWarnThreshold
2578        + ") (HBASE-18023)." + " Requested Number of Rows: " + sum + " Client: "
2579        + RpcServer.getRequestUserName().orElse(null) + "/"
2580        + RpcServer.getRemoteAddress().orElse(null) + " first region in multi=" + firstRegionName);
2581      if (rejectRowsWithSizeOverThreshold) {
2582        throw new ServiceException(
2583          "Rejecting large batch operation for current batch with firstRegionName: "
2584            + firstRegionName + " , Requested Number of Rows: " + sum + " , Size Threshold: "
2585            + rowSizeWarnThreshold);
2586      }
2587    }
2588  }
2589
2590  private void failRegionAction(MultiResponse.Builder responseBuilder,
2591    RegionActionResult.Builder regionActionResultBuilder, RegionAction regionAction,
2592    CellScanner cellScanner, Throwable error) {
2593    rpcServer.getMetrics().exception(error);
2594    regionActionResultBuilder.setException(ResponseConverter.buildException(error));
2595    responseBuilder.addRegionActionResult(regionActionResultBuilder.build());
2596    // All Mutations in this RegionAction not executed as we can not see the Region online here
2597    // in this RS. Will be retried from Client. Skipping all the Cells in CellScanner
2598    // corresponding to these Mutations.
2599    if (cellScanner != null) {
2600      skipCellsForMutations(regionAction.getActionList(), cellScanner);
2601    }
2602  }
2603
2604  private boolean isReplicationRequest(Action action) {
2605    // replication request can only be put or delete.
2606    if (!action.hasMutation()) {
2607      return false;
2608    }
2609    MutationProto mutation = action.getMutation();
2610    MutationType type = mutation.getMutateType();
2611    if (type != MutationType.PUT && type != MutationType.DELETE) {
2612      return false;
2613    }
2614    // replication will set a special attribute so we can make use of it to decide whether a request
2615    // is for replication.
2616    return mutation.getAttributeList().stream().map(p -> p.getName())
2617      .filter(n -> n.equals(ReplicationUtils.REPLICATION_ATTR_NAME)).findAny().isPresent();
2618  }
2619
2620  /**
2621   * Execute multiple actions on a table: get, mutate, and/or execCoprocessor
2622   * @param rpcc    the RPC controller
2623   * @param request the multi request
2624   */
2625  @Override
2626  public MultiResponse multi(final RpcController rpcc, final MultiRequest request)
2627    throws ServiceException {
2628    try {
2629      checkOpen();
2630    } catch (IOException ie) {
2631      throw new ServiceException(ie);
2632    }
2633
2634    checkBatchSizeAndLogLargeSize(request);
2635
2636    // rpc controller is how we bring in data via the back door; it is unprotobuf'ed data.
2637    // It is also the conduit via which we pass back data.
2638    HBaseRpcController controller = (HBaseRpcController) rpcc;
2639    CellScanner cellScanner = controller != null ? controller.cellScanner() : null;
2640    if (controller != null) {
2641      controller.setCellScanner(null);
2642    }
2643
2644    long nonceGroup = request.hasNonceGroup() ? request.getNonceGroup() : HConstants.NO_NONCE;
2645
2646    MultiResponse.Builder responseBuilder = MultiResponse.newBuilder();
2647    RegionActionResult.Builder regionActionResultBuilder = RegionActionResult.newBuilder();
2648    this.rpcMultiRequestCount.increment();
2649    this.requestCount.increment();
2650    ActivePolicyEnforcement spaceQuotaEnforcement = getSpaceQuotaManager().getActiveEnforcements();
2651
2652    // We no longer use MultiRequest#condition. Instead, we use RegionAction#condition. The
2653    // following logic is for backward compatibility as old clients still use
2654    // MultiRequest#condition in case of checkAndMutate with RowMutations.
2655    if (request.hasCondition()) {
2656      if (request.getRegionActionList().isEmpty()) {
2657        // If the region action list is empty, do nothing.
2658        responseBuilder.setProcessed(true);
2659        return responseBuilder.build();
2660      }
2661
2662      RegionAction regionAction = request.getRegionAction(0);
2663
2664      // When request.hasCondition() is true, regionAction.getAtomic() should be always true. So
2665      // we can assume regionAction.getAtomic() is true here.
2666      assert regionAction.getAtomic();
2667
2668      OperationQuota quota;
2669      HRegion region;
2670      RegionSpecifier regionSpecifier = regionAction.getRegion();
2671
2672      try {
2673        region = getRegion(regionSpecifier);
2674        quota = getRpcQuotaManager().checkQuota(region, regionAction.getActionList());
2675      } catch (IOException e) {
2676        failRegionAction(responseBuilder, regionActionResultBuilder, regionAction, cellScanner, e);
2677        return responseBuilder.build();
2678      }
2679
2680      try {
2681        boolean rejectIfFromClient = shouldRejectRequestsFromClient(region);
2682        // We only allow replication in standby state and it will not set the atomic flag.
2683        if (rejectIfFromClient) {
2684          failRegionAction(responseBuilder, regionActionResultBuilder, regionAction, cellScanner,
2685            new DoNotRetryIOException(
2686              region.getRegionInfo().getRegionNameAsString() + " is in STANDBY state"));
2687          return responseBuilder.build();
2688        }
2689
2690        try {
2691          CheckAndMutateResult result = checkAndMutate(region, regionAction.getActionList(),
2692            cellScanner, request.getCondition(), nonceGroup, spaceQuotaEnforcement);
2693          responseBuilder.setProcessed(result.isSuccess());
2694          ClientProtos.ResultOrException.Builder resultOrExceptionOrBuilder =
2695            ClientProtos.ResultOrException.newBuilder();
2696          for (int i = 0; i < regionAction.getActionCount(); i++) {
2697            // To unify the response format with doNonAtomicRegionMutation and read through
2698            // client's AsyncProcess we have to add an empty result instance per operation
2699            resultOrExceptionOrBuilder.clear();
2700            resultOrExceptionOrBuilder.setIndex(i);
2701            regionActionResultBuilder.addResultOrException(resultOrExceptionOrBuilder.build());
2702          }
2703        } catch (IOException e) {
2704          rpcServer.getMetrics().exception(e);
2705          // As it's an atomic operation with a condition, we may expect it's a global failure.
2706          regionActionResultBuilder.setException(ResponseConverter.buildException(e));
2707        }
2708      } finally {
2709        quota.close();
2710      }
2711
2712      responseBuilder.addRegionActionResult(regionActionResultBuilder.build());
2713      ClientProtos.RegionLoadStats regionLoadStats = region.getLoadStatistics();
2714      if (regionLoadStats != null) {
2715        responseBuilder.setRegionStatistics(MultiRegionLoadStats.newBuilder()
2716          .addRegion(regionSpecifier).addStat(regionLoadStats).build());
2717      }
2718      return responseBuilder.build();
2719    }
2720
2721    // this will contain all the cells that we need to return. It's created later, if needed.
2722    List<CellScannable> cellsToReturn = null;
2723    RegionScannersCloseCallBack closeCallBack = null;
2724    RpcCallContext context = RpcServer.getCurrentCall().orElse(null);
2725    Map<RegionSpecifier, ClientProtos.RegionLoadStats> regionStats =
2726      new HashMap<>(request.getRegionActionCount());
2727
2728    for (RegionAction regionAction : request.getRegionActionList()) {
2729      OperationQuota quota;
2730      HRegion region;
2731      RegionSpecifier regionSpecifier = regionAction.getRegion();
2732      regionActionResultBuilder.clear();
2733
2734      try {
2735        region = getRegion(regionSpecifier);
2736        quota = getRpcQuotaManager().checkQuota(region, regionAction.getActionList());
2737      } catch (IOException e) {
2738        failRegionAction(responseBuilder, regionActionResultBuilder, regionAction, cellScanner, e);
2739        continue; // For this region it's a failure.
2740      }
2741
2742      try {
2743        boolean rejectIfFromClient = shouldRejectRequestsFromClient(region);
2744
2745        if (regionAction.hasCondition()) {
2746          // We only allow replication in standby state and it will not set the atomic flag.
2747          if (rejectIfFromClient) {
2748            failRegionAction(responseBuilder, regionActionResultBuilder, regionAction, cellScanner,
2749              new DoNotRetryIOException(
2750                region.getRegionInfo().getRegionNameAsString() + " is in STANDBY state"));
2751            continue;
2752          }
2753
2754          try {
2755            ClientProtos.ResultOrException.Builder resultOrExceptionOrBuilder =
2756              ClientProtos.ResultOrException.newBuilder();
2757            if (regionAction.getActionCount() == 1) {
2758              CheckAndMutateResult result =
2759                checkAndMutate(region, quota, regionAction.getAction(0).getMutation(), cellScanner,
2760                  regionAction.getCondition(), nonceGroup, spaceQuotaEnforcement);
2761              regionActionResultBuilder.setProcessed(result.isSuccess());
2762              resultOrExceptionOrBuilder.setIndex(0);
2763              if (result.getResult() != null) {
2764                resultOrExceptionOrBuilder.setResult(ProtobufUtil.toResult(result.getResult()));
2765              }
2766              regionActionResultBuilder.addResultOrException(resultOrExceptionOrBuilder.build());
2767            } else {
2768              CheckAndMutateResult result = checkAndMutate(region, regionAction.getActionList(),
2769                cellScanner, regionAction.getCondition(), nonceGroup, spaceQuotaEnforcement);
2770              regionActionResultBuilder.setProcessed(result.isSuccess());
2771              for (int i = 0; i < regionAction.getActionCount(); i++) {
2772                if (i == 0 && result.getResult() != null) {
2773                  // Set the result of the Increment/Append operations to the first element of the
2774                  // ResultOrException list
2775                  resultOrExceptionOrBuilder.setIndex(i);
2776                  regionActionResultBuilder.addResultOrException(resultOrExceptionOrBuilder
2777                    .setResult(ProtobufUtil.toResult(result.getResult())).build());
2778                  continue;
2779                }
2780                // To unify the response format with doNonAtomicRegionMutation and read through
2781                // client's AsyncProcess we have to add an empty result instance per operation
2782                resultOrExceptionOrBuilder.clear();
2783                resultOrExceptionOrBuilder.setIndex(i);
2784                regionActionResultBuilder.addResultOrException(resultOrExceptionOrBuilder.build());
2785              }
2786            }
2787          } catch (IOException e) {
2788            rpcServer.getMetrics().exception(e);
2789            // As it's an atomic operation with a condition, we may expect it's a global failure.
2790            regionActionResultBuilder.setException(ResponseConverter.buildException(e));
2791          }
2792        } else if (regionAction.hasAtomic() && regionAction.getAtomic()) {
2793          // We only allow replication in standby state and it will not set the atomic flag.
2794          if (rejectIfFromClient) {
2795            failRegionAction(responseBuilder, regionActionResultBuilder, regionAction, cellScanner,
2796              new DoNotRetryIOException(
2797                region.getRegionInfo().getRegionNameAsString() + " is in STANDBY state"));
2798            continue;
2799          }
2800          try {
2801            doAtomicBatchOp(regionActionResultBuilder, region, quota, regionAction.getActionList(),
2802              cellScanner, nonceGroup, spaceQuotaEnforcement);
2803            regionActionResultBuilder.setProcessed(true);
2804            // We no longer use MultiResponse#processed. Instead, we use
2805            // RegionActionResult#processed. This is for backward compatibility for old clients.
2806            responseBuilder.setProcessed(true);
2807          } catch (IOException e) {
2808            rpcServer.getMetrics().exception(e);
2809            // As it's atomic, we may expect it's a global failure.
2810            regionActionResultBuilder.setException(ResponseConverter.buildException(e));
2811          }
2812        } else {
2813          if (
2814            rejectIfFromClient && regionAction.getActionCount() > 0
2815              && !isReplicationRequest(regionAction.getAction(0))
2816          ) {
2817            // fail if it is not a replication request
2818            failRegionAction(responseBuilder, regionActionResultBuilder, regionAction, cellScanner,
2819              new DoNotRetryIOException(
2820                region.getRegionInfo().getRegionNameAsString() + " is in STANDBY state"));
2821            continue;
2822          }
2823          // doNonAtomicRegionMutation manages the exception internally
2824          if (context != null && closeCallBack == null) {
2825            // An RpcCallBack that creates a list of scanners that needs to perform callBack
2826            // operation on completion of multiGets.
2827            // Set this only once
2828            closeCallBack = new RegionScannersCloseCallBack();
2829            context.setCallBack(closeCallBack);
2830          }
2831          cellsToReturn = doNonAtomicRegionMutation(region, quota, regionAction, cellScanner,
2832            regionActionResultBuilder, cellsToReturn, nonceGroup, closeCallBack, context,
2833            spaceQuotaEnforcement);
2834        }
2835      } finally {
2836        quota.close();
2837      }
2838
2839      responseBuilder.addRegionActionResult(regionActionResultBuilder.build());
2840      ClientProtos.RegionLoadStats regionLoadStats = region.getLoadStatistics();
2841      if (regionLoadStats != null) {
2842        regionStats.put(regionSpecifier, regionLoadStats);
2843      }
2844    }
2845    // Load the controller with the Cells to return.
2846    if (cellsToReturn != null && !cellsToReturn.isEmpty() && controller != null) {
2847      controller.setCellScanner(CellUtil.createCellScanner(cellsToReturn));
2848    }
2849
2850    MultiRegionLoadStats.Builder builder = MultiRegionLoadStats.newBuilder();
2851    for (Entry<RegionSpecifier, ClientProtos.RegionLoadStats> stat : regionStats.entrySet()) {
2852      builder.addRegion(stat.getKey());
2853      builder.addStat(stat.getValue());
2854    }
2855    responseBuilder.setRegionStatistics(builder);
2856    return responseBuilder.build();
2857  }
2858
2859  private void skipCellsForMutations(List<Action> actions, CellScanner cellScanner) {
2860    if (cellScanner == null) {
2861      return;
2862    }
2863    for (Action action : actions) {
2864      skipCellsForMutation(action, cellScanner);
2865    }
2866  }
2867
2868  private void skipCellsForMutation(Action action, CellScanner cellScanner) {
2869    if (cellScanner == null) {
2870      return;
2871    }
2872    try {
2873      if (action.hasMutation()) {
2874        MutationProto m = action.getMutation();
2875        if (m.hasAssociatedCellCount()) {
2876          for (int i = 0; i < m.getAssociatedCellCount(); i++) {
2877            cellScanner.advance();
2878          }
2879        }
2880      }
2881    } catch (IOException e) {
2882      // No need to handle these Individual Muatation level issue. Any way this entire RegionAction
2883      // marked as failed as we could not see the Region here. At client side the top level
2884      // RegionAction exception will be considered first.
2885      LOG.error("Error while skipping Cells in CellScanner for invalid Region Mutations", e);
2886    }
2887  }
2888
2889  /**
2890   * Mutate data in a table.
2891   * @param rpcc    the RPC controller
2892   * @param request the mutate request
2893   */
2894  @Override
2895  public MutateResponse mutate(final RpcController rpcc, final MutateRequest request)
2896    throws ServiceException {
2897    // rpc controller is how we bring in data via the back door; it is unprotobuf'ed data.
2898    // It is also the conduit via which we pass back data.
2899    HBaseRpcController controller = (HBaseRpcController) rpcc;
2900    CellScanner cellScanner = controller != null ? controller.cellScanner() : null;
2901    OperationQuota quota = null;
2902    RpcCallContext context = RpcServer.getCurrentCall().orElse(null);
2903    // Clear scanner so we are not holding on to reference across call.
2904    if (controller != null) {
2905      controller.setCellScanner(null);
2906    }
2907    try {
2908      checkOpen();
2909      requestCount.increment();
2910      rpcMutateRequestCount.increment();
2911      HRegion region = getRegion(request.getRegion());
2912      rejectIfInStandByState(region);
2913      MutateResponse.Builder builder = MutateResponse.newBuilder();
2914      MutationProto mutation = request.getMutation();
2915      if (!region.getRegionInfo().isMetaRegion()) {
2916        server.getMemStoreFlusher().reclaimMemStoreMemory();
2917      }
2918      long nonceGroup = request.hasNonceGroup() ? request.getNonceGroup() : HConstants.NO_NONCE;
2919      quota = getRpcQuotaManager().checkQuota(region, OperationQuota.OperationType.MUTATE);
2920      ActivePolicyEnforcement spaceQuotaEnforcement =
2921        getSpaceQuotaManager().getActiveEnforcements();
2922
2923      if (request.hasCondition()) {
2924        CheckAndMutateResult result = checkAndMutate(region, quota, mutation, cellScanner,
2925          request.getCondition(), nonceGroup, spaceQuotaEnforcement);
2926        builder.setProcessed(result.isSuccess());
2927        boolean clientCellBlockSupported = isClientCellBlockSupport(context);
2928        addResult(builder, result.getResult(), controller, clientCellBlockSupported);
2929        if (clientCellBlockSupported) {
2930          addSize(context, result.getResult());
2931        }
2932      } else {
2933        Result r = null;
2934        Boolean processed = null;
2935        MutationType type = mutation.getMutateType();
2936        switch (type) {
2937          case APPEND:
2938            // TODO: this doesn't actually check anything.
2939            r = append(region, quota, mutation, cellScanner, nonceGroup, spaceQuotaEnforcement);
2940            break;
2941          case INCREMENT:
2942            // TODO: this doesn't actually check anything.
2943            r = increment(region, quota, mutation, cellScanner, nonceGroup, spaceQuotaEnforcement);
2944            break;
2945          case PUT:
2946            put(region, quota, mutation, cellScanner, spaceQuotaEnforcement);
2947            processed = Boolean.TRUE;
2948            break;
2949          case DELETE:
2950            delete(region, quota, mutation, cellScanner, spaceQuotaEnforcement);
2951            processed = Boolean.TRUE;
2952            break;
2953          default:
2954            throw new DoNotRetryIOException("Unsupported mutate type: " + type.name());
2955        }
2956        if (processed != null) {
2957          builder.setProcessed(processed);
2958        }
2959        boolean clientCellBlockSupported = isClientCellBlockSupport(context);
2960        addResult(builder, r, controller, clientCellBlockSupported);
2961        if (clientCellBlockSupported) {
2962          addSize(context, r);
2963        }
2964      }
2965      return builder.build();
2966    } catch (IOException ie) {
2967      server.checkFileSystem();
2968      throw new ServiceException(ie);
2969    } finally {
2970      if (quota != null) {
2971        quota.close();
2972      }
2973    }
2974  }
2975
2976  private void put(HRegion region, OperationQuota quota, MutationProto mutation,
2977    CellScanner cellScanner, ActivePolicyEnforcement spaceQuota) throws IOException {
2978    long before = EnvironmentEdgeManager.currentTime();
2979    Put put = ProtobufUtil.toPut(mutation, cellScanner);
2980    checkCellSizeLimit(region, put);
2981    spaceQuota.getPolicyEnforcement(region).check(put);
2982    quota.addMutation(put);
2983    region.put(put);
2984
2985    MetricsRegionServer metricsRegionServer = server.getMetrics();
2986    if (metricsRegionServer != null) {
2987      long after = EnvironmentEdgeManager.currentTime();
2988      metricsRegionServer.updatePut(region.getRegionInfo().getTable(), after - before);
2989    }
2990  }
2991
2992  private void delete(HRegion region, OperationQuota quota, MutationProto mutation,
2993    CellScanner cellScanner, ActivePolicyEnforcement spaceQuota) throws IOException {
2994    long before = EnvironmentEdgeManager.currentTime();
2995    Delete delete = ProtobufUtil.toDelete(mutation, cellScanner);
2996    checkCellSizeLimit(region, delete);
2997    spaceQuota.getPolicyEnforcement(region).check(delete);
2998    quota.addMutation(delete);
2999    region.delete(delete);
3000
3001    MetricsRegionServer metricsRegionServer = server.getMetrics();
3002    if (metricsRegionServer != null) {
3003      long after = EnvironmentEdgeManager.currentTime();
3004      metricsRegionServer.updateDelete(region.getRegionInfo().getTable(), after - before);
3005    }
3006  }
3007
3008  private CheckAndMutateResult checkAndMutate(HRegion region, OperationQuota quota,
3009    MutationProto mutation, CellScanner cellScanner, Condition condition, long nonceGroup,
3010    ActivePolicyEnforcement spaceQuota) throws IOException {
3011    long before = EnvironmentEdgeManager.currentTime();
3012    CheckAndMutate checkAndMutate = ProtobufUtil.toCheckAndMutate(condition, mutation, cellScanner);
3013    long nonce = mutation.hasNonce() ? mutation.getNonce() : HConstants.NO_NONCE;
3014    checkCellSizeLimit(region, (Mutation) checkAndMutate.getAction());
3015    spaceQuota.getPolicyEnforcement(region).check((Mutation) checkAndMutate.getAction());
3016    quota.addMutation((Mutation) checkAndMutate.getAction());
3017
3018    CheckAndMutateResult result = null;
3019    if (region.getCoprocessorHost() != null) {
3020      result = region.getCoprocessorHost().preCheckAndMutate(checkAndMutate);
3021    }
3022    if (result == null) {
3023      result = region.checkAndMutate(checkAndMutate, nonceGroup, nonce);
3024      if (region.getCoprocessorHost() != null) {
3025        result = region.getCoprocessorHost().postCheckAndMutate(checkAndMutate, result);
3026      }
3027    }
3028    MetricsRegionServer metricsRegionServer = server.getMetrics();
3029    if (metricsRegionServer != null) {
3030      long after = EnvironmentEdgeManager.currentTime();
3031      metricsRegionServer.updateCheckAndMutate(region.getRegionInfo().getTable(), after - before);
3032
3033      MutationType type = mutation.getMutateType();
3034      switch (type) {
3035        case PUT:
3036          metricsRegionServer.updateCheckAndPut(region.getRegionInfo().getTable(), after - before);
3037          break;
3038        case DELETE:
3039          metricsRegionServer.updateCheckAndDelete(region.getRegionInfo().getTable(),
3040            after - before);
3041          break;
3042        default:
3043          break;
3044      }
3045    }
3046    return result;
3047  }
3048
3049  // This is used to keep compatible with the old client implementation. Consider remove it if we
3050  // decide to drop the support of the client that still sends close request to a region scanner
3051  // which has already been exhausted.
3052  @Deprecated
3053  private static final IOException SCANNER_ALREADY_CLOSED = new IOException() {
3054
3055    private static final long serialVersionUID = -4305297078988180130L;
3056
3057    @Override
3058    public synchronized Throwable fillInStackTrace() {
3059      return this;
3060    }
3061  };
3062
3063  private RegionScannerHolder getRegionScanner(ScanRequest request) throws IOException {
3064    String scannerName = toScannerName(request.getScannerId());
3065    RegionScannerHolder rsh = this.scanners.get(scannerName);
3066    if (rsh == null) {
3067      // just ignore the next or close request if scanner does not exists.
3068      if (closedScanners.getIfPresent(scannerName) != null) {
3069        throw SCANNER_ALREADY_CLOSED;
3070      } else {
3071        LOG.warn("Client tried to access missing scanner " + scannerName);
3072        throw new UnknownScannerException(
3073          "Unknown scanner '" + scannerName + "'. This can happen due to any of the following "
3074            + "reasons: a) Scanner id given is wrong, b) Scanner lease expired because of "
3075            + "long wait between consecutive client checkins, c) Server may be closing down, "
3076            + "d) RegionServer restart during upgrade.\nIf the issue is due to reason (b), a "
3077            + "possible fix would be increasing the value of"
3078            + "'hbase.client.scanner.timeout.period' configuration.");
3079      }
3080    }
3081    rejectIfInStandByState(rsh.r);
3082    RegionInfo hri = rsh.s.getRegionInfo();
3083    // Yes, should be the same instance
3084    if (server.getOnlineRegion(hri.getRegionName()) != rsh.r) {
3085      String msg = "Region has changed on the scanner " + scannerName + ": regionName="
3086        + hri.getRegionNameAsString() + ", scannerRegionName=" + rsh.r;
3087      LOG.warn(msg + ", closing...");
3088      scanners.remove(scannerName);
3089      try {
3090        rsh.s.close();
3091      } catch (IOException e) {
3092        LOG.warn("Getting exception closing " + scannerName, e);
3093      } finally {
3094        try {
3095          server.getLeaseManager().cancelLease(scannerName);
3096        } catch (LeaseException e) {
3097          LOG.warn("Getting exception closing " + scannerName, e);
3098        }
3099      }
3100      throw new NotServingRegionException(msg);
3101    }
3102    return rsh;
3103  }
3104
3105  /**
3106   * @return Pair with scannerName key to use with this new Scanner and its RegionScannerHolder
3107   *         value.
3108   */
3109  private Pair<String, RegionScannerHolder> newRegionScanner(ScanRequest request,
3110    ScanResponse.Builder builder) throws IOException {
3111    HRegion region = getRegion(request.getRegion());
3112    rejectIfInStandByState(region);
3113    ClientProtos.Scan protoScan = request.getScan();
3114    boolean isLoadingCfsOnDemandSet = protoScan.hasLoadColumnFamiliesOnDemand();
3115    Scan scan = ProtobufUtil.toScan(protoScan);
3116    // if the request doesn't set this, get the default region setting.
3117    if (!isLoadingCfsOnDemandSet) {
3118      scan.setLoadColumnFamiliesOnDemand(region.isLoadingCfsOnDemandDefault());
3119    }
3120
3121    if (!scan.hasFamilies()) {
3122      // Adding all families to scanner
3123      for (byte[] family : region.getTableDescriptor().getColumnFamilyNames()) {
3124        scan.addFamily(family);
3125      }
3126    }
3127    if (region.getCoprocessorHost() != null) {
3128      // preScannerOpen is not allowed to return a RegionScanner. Only post hook can create a
3129      // wrapper for the core created RegionScanner
3130      region.getCoprocessorHost().preScannerOpen(scan);
3131    }
3132    RegionScannerImpl coreScanner = region.getScanner(scan);
3133    Shipper shipper = coreScanner;
3134    RegionScanner scanner = coreScanner;
3135    try {
3136      if (region.getCoprocessorHost() != null) {
3137        scanner = region.getCoprocessorHost().postScannerOpen(scan, scanner);
3138      }
3139    } catch (Exception e) {
3140      // Although region coprocessor is for advanced users and they should take care of the
3141      // implementation to not damage the HBase system, closing the scanner on exception here does
3142      // not have any bad side effect, so let's do it
3143      scanner.close();
3144      throw e;
3145    }
3146    long scannerId = scannerIdGenerator.generateNewScannerId();
3147    builder.setScannerId(scannerId);
3148    builder.setMvccReadPoint(scanner.getMvccReadPoint());
3149    builder.setTtl(scannerLeaseTimeoutPeriod);
3150    String scannerName = toScannerName(scannerId);
3151
3152    boolean fullRegionScan =
3153      !region.getRegionInfo().getTable().isSystemTable() && isFullRegionScan(scan, region);
3154
3155    return new Pair<String, RegionScannerHolder>(scannerName,
3156      addScanner(scannerName, scanner, shipper, region, scan.isNeedCursorResult(), fullRegionScan));
3157  }
3158
3159  /**
3160   * The returned String is used as key doing look up of outstanding Scanners in this Servers'
3161   * this.scanners, the Map of outstanding scanners and their current state.
3162   * @param scannerId A scanner long id.
3163   * @return The long id as a String.
3164   */
3165  private static String toScannerName(long scannerId) {
3166    return Long.toString(scannerId);
3167  }
3168
3169  private void checkScanNextCallSeq(ScanRequest request, RegionScannerHolder rsh)
3170    throws OutOfOrderScannerNextException {
3171    // if nextCallSeq does not match throw Exception straight away. This needs to be
3172    // performed even before checking of Lease.
3173    // See HBASE-5974
3174    if (request.hasNextCallSeq()) {
3175      long callSeq = request.getNextCallSeq();
3176      if (!rsh.incNextCallSeq(callSeq)) {
3177        throw new OutOfOrderScannerNextException(
3178          "Expected nextCallSeq: " + rsh.getNextCallSeq() + " But the nextCallSeq got from client: "
3179            + request.getNextCallSeq() + "; request=" + TextFormat.shortDebugString(request));
3180      }
3181    }
3182  }
3183
3184  private void addScannerLeaseBack(LeaseManager.Lease lease) {
3185    try {
3186      server.getLeaseManager().addLease(lease);
3187    } catch (LeaseStillHeldException e) {
3188      // should not happen as the scanner id is unique.
3189      throw new AssertionError(e);
3190    }
3191  }
3192
3193  // visible for testing only
3194  long getTimeLimit(RpcCall rpcCall, HBaseRpcController controller,
3195    boolean allowHeartbeatMessages) {
3196    // Set the time limit to be half of the more restrictive timeout value (one of the
3197    // timeout values must be positive). In the event that both values are positive, the
3198    // more restrictive of the two is used to calculate the limit.
3199    if (allowHeartbeatMessages) {
3200      long now = EnvironmentEdgeManager.currentTime();
3201      long remainingTimeout = getRemainingRpcTimeout(rpcCall, controller, now);
3202      if (scannerLeaseTimeoutPeriod > 0 || remainingTimeout > 0) {
3203        long timeLimitDelta;
3204        if (scannerLeaseTimeoutPeriod > 0 && remainingTimeout > 0) {
3205          timeLimitDelta = Math.min(scannerLeaseTimeoutPeriod, remainingTimeout);
3206        } else {
3207          timeLimitDelta =
3208            scannerLeaseTimeoutPeriod > 0 ? scannerLeaseTimeoutPeriod : remainingTimeout;
3209        }
3210
3211        // Use half of whichever timeout value was more restrictive... But don't allow
3212        // the time limit to be less than the allowable minimum (could cause an
3213        // immediate timeout before scanning any data).
3214        timeLimitDelta = Math.max(timeLimitDelta / 2, minimumScanTimeLimitDelta);
3215        return now + timeLimitDelta;
3216      }
3217    }
3218    // Default value of timeLimit is negative to indicate no timeLimit should be
3219    // enforced.
3220    return -1L;
3221  }
3222
3223  private long getRemainingRpcTimeout(RpcCall call, HBaseRpcController controller, long now) {
3224    long timeout;
3225    if (controller != null && controller.getCallTimeout() > 0) {
3226      timeout = controller.getCallTimeout();
3227    } else if (rpcTimeout > 0) {
3228      timeout = rpcTimeout;
3229    } else {
3230      return -1;
3231    }
3232    if (call != null) {
3233      timeout -= (now - call.getReceiveTime());
3234    }
3235    // getTimeLimit ignores values <= 0, but timeout may now be negative if queue time was high.
3236    // return minimum value here in that case so we count this in calculating the final delta.
3237    return Math.max(minimumScanTimeLimitDelta, timeout);
3238  }
3239
3240  private void checkLimitOfRows(int numOfCompleteRows, int limitOfRows, boolean moreRows,
3241    ScannerContext scannerContext, ScanResponse.Builder builder) {
3242    if (numOfCompleteRows >= limitOfRows) {
3243      if (LOG.isTraceEnabled()) {
3244        LOG.trace("Done scanning, limit of rows reached, moreRows: " + moreRows
3245          + " scannerContext: " + scannerContext);
3246      }
3247      builder.setMoreResults(false);
3248    }
3249  }
3250
3251  // return whether we have more results in region.
3252  private void scan(HBaseRpcController controller, ScanRequest request, RegionScannerHolder rsh,
3253    long maxQuotaResultSize, int maxResults, int limitOfRows, List<Result> results,
3254    ScanResponse.Builder builder, RpcCall rpcCall) throws IOException {
3255    HRegion region = rsh.r;
3256    RegionScanner scanner = rsh.s;
3257    long maxResultSize;
3258    if (scanner.getMaxResultSize() > 0) {
3259      maxResultSize = Math.min(scanner.getMaxResultSize(), maxQuotaResultSize);
3260    } else {
3261      maxResultSize = maxQuotaResultSize;
3262    }
3263    // This is cells inside a row. Default size is 10 so if many versions or many cfs,
3264    // then we'll resize. Resizings show in profiler. Set it higher than 10. For now
3265    // arbitrary 32. TODO: keep record of general size of results being returned.
3266    ArrayList<Cell> values = new ArrayList<>(32);
3267    region.startRegionOperation(Operation.SCAN);
3268    long before = EnvironmentEdgeManager.currentTime();
3269    // Used to check if we've matched the row limit set on the Scan
3270    int numOfCompleteRows = 0;
3271    // Count of times we call nextRaw; can be > numOfCompleteRows.
3272    int numOfNextRawCalls = 0;
3273    try {
3274      int numOfResults = 0;
3275      synchronized (scanner) {
3276        boolean stale = (region.getRegionInfo().getReplicaId() != 0);
3277        boolean clientHandlesPartials =
3278          request.hasClientHandlesPartials() && request.getClientHandlesPartials();
3279        boolean clientHandlesHeartbeats =
3280          request.hasClientHandlesHeartbeats() && request.getClientHandlesHeartbeats();
3281
3282        // On the server side we must ensure that the correct ordering of partial results is
3283        // returned to the client to allow them to properly reconstruct the partial results.
3284        // If the coprocessor host is adding to the result list, we cannot guarantee the
3285        // correct ordering of partial results and so we prevent partial results from being
3286        // formed.
3287        boolean serverGuaranteesOrderOfPartials = results.isEmpty();
3288        boolean allowPartialResults = clientHandlesPartials && serverGuaranteesOrderOfPartials;
3289        boolean moreRows = false;
3290
3291        // Heartbeat messages occur when the processing of the ScanRequest is exceeds a
3292        // certain time threshold on the server. When the time threshold is exceeded, the
3293        // server stops the scan and sends back whatever Results it has accumulated within
3294        // that time period (may be empty). Since heartbeat messages have the potential to
3295        // create partial Results (in the event that the timeout occurs in the middle of a
3296        // row), we must only generate heartbeat messages when the client can handle both
3297        // heartbeats AND partials
3298        boolean allowHeartbeatMessages = clientHandlesHeartbeats && allowPartialResults;
3299
3300        long timeLimit = getTimeLimit(rpcCall, controller, allowHeartbeatMessages);
3301
3302        final LimitScope sizeScope =
3303          allowPartialResults ? LimitScope.BETWEEN_CELLS : LimitScope.BETWEEN_ROWS;
3304        final LimitScope timeScope =
3305          allowHeartbeatMessages ? LimitScope.BETWEEN_CELLS : LimitScope.BETWEEN_ROWS;
3306
3307        boolean trackMetrics = request.hasTrackScanMetrics() && request.getTrackScanMetrics();
3308
3309        // Configure with limits for this RPC. Set keep progress true since size progress
3310        // towards size limit should be kept between calls to nextRaw
3311        ScannerContext.Builder contextBuilder = ScannerContext.newBuilder(true);
3312        // maxResultSize - either we can reach this much size for all cells(being read) data or sum
3313        // of heap size occupied by cells(being read). Cell data means its key and value parts.
3314        // maxQuotaResultSize - max results just from server side configuration and quotas, without
3315        // user's specified max. We use this for evaluating limits based on blocks (not cells).
3316        // We may have accumulated some results in coprocessor preScannerNext call. Subtract any
3317        // cell or block size from maximum here so we adhere to total limits of request.
3318        // Note: we track block size in StoreScanner. If the CP hook got cells from hbase, it will
3319        // have accumulated block bytes. If not, this will be 0 for block size.
3320        long maxCellSize = maxResultSize;
3321        long maxBlockSize = maxQuotaResultSize;
3322        if (rpcCall != null) {
3323          maxBlockSize -= rpcCall.getResponseBlockSize();
3324          maxCellSize -= rpcCall.getResponseCellSize();
3325        }
3326
3327        contextBuilder.setSizeLimit(sizeScope, maxCellSize, maxCellSize, maxBlockSize);
3328        contextBuilder.setBatchLimit(scanner.getBatch());
3329        contextBuilder.setTimeLimit(timeScope, timeLimit);
3330        contextBuilder.setTrackMetrics(trackMetrics);
3331        ScannerContext scannerContext = contextBuilder.build();
3332        boolean limitReached = false;
3333        while (numOfResults < maxResults) {
3334          // Reset the batch progress to 0 before every call to RegionScanner#nextRaw. The
3335          // batch limit is a limit on the number of cells per Result. Thus, if progress is
3336          // being tracked (i.e. scannerContext.keepProgress() is true) then we need to
3337          // reset the batch progress between nextRaw invocations since we don't want the
3338          // batch progress from previous calls to affect future calls
3339          scannerContext.setBatchProgress(0);
3340          assert values.isEmpty();
3341
3342          // Collect values to be returned here
3343          moreRows = scanner.nextRaw(values, scannerContext);
3344          if (rpcCall == null) {
3345            // When there is no RpcCallContext,copy EC to heap, then the scanner would close,
3346            // This can be an EXPENSIVE call. It may make an extra copy from offheap to onheap
3347            // buffers.See more details in HBASE-26036.
3348            CellUtil.cloneIfNecessary(values);
3349          }
3350          numOfNextRawCalls++;
3351
3352          if (!values.isEmpty()) {
3353            if (limitOfRows > 0) {
3354              // First we need to check if the last result is partial and we have a row change. If
3355              // so then we need to increase the numOfCompleteRows.
3356              if (results.isEmpty()) {
3357                if (
3358                  rsh.rowOfLastPartialResult != null
3359                    && !CellUtil.matchingRows(values.get(0), rsh.rowOfLastPartialResult)
3360                ) {
3361                  numOfCompleteRows++;
3362                  checkLimitOfRows(numOfCompleteRows, limitOfRows, moreRows, scannerContext,
3363                    builder);
3364                }
3365              } else {
3366                Result lastResult = results.get(results.size() - 1);
3367                if (
3368                  lastResult.mayHaveMoreCellsInRow()
3369                    && !CellUtil.matchingRows(values.get(0), lastResult.getRow())
3370                ) {
3371                  numOfCompleteRows++;
3372                  checkLimitOfRows(numOfCompleteRows, limitOfRows, moreRows, scannerContext,
3373                    builder);
3374                }
3375              }
3376              if (builder.hasMoreResults() && !builder.getMoreResults()) {
3377                break;
3378              }
3379            }
3380            boolean mayHaveMoreCellsInRow = scannerContext.mayHaveMoreCellsInRow();
3381            Result r = Result.create(values, null, stale, mayHaveMoreCellsInRow);
3382            results.add(r);
3383            numOfResults++;
3384            if (!mayHaveMoreCellsInRow && limitOfRows > 0) {
3385              numOfCompleteRows++;
3386              checkLimitOfRows(numOfCompleteRows, limitOfRows, moreRows, scannerContext, builder);
3387              if (builder.hasMoreResults() && !builder.getMoreResults()) {
3388                break;
3389              }
3390            }
3391          } else if (!moreRows && !results.isEmpty()) {
3392            // No more cells for the scan here, we need to ensure that the mayHaveMoreCellsInRow of
3393            // last result is false. Otherwise it's possible that: the first nextRaw returned
3394            // because BATCH_LIMIT_REACHED (BTW it happen to exhaust all cells of the scan),so the
3395            // last result's mayHaveMoreCellsInRow will be true. while the following nextRaw will
3396            // return with moreRows=false, which means moreResultsInRegion would be false, it will
3397            // be a contradictory state (HBASE-21206).
3398            int lastIdx = results.size() - 1;
3399            Result r = results.get(lastIdx);
3400            if (r.mayHaveMoreCellsInRow()) {
3401              results.set(lastIdx, Result.create(r.rawCells(), r.getExists(), r.isStale(), false));
3402            }
3403          }
3404          boolean sizeLimitReached = scannerContext.checkSizeLimit(LimitScope.BETWEEN_ROWS);
3405          boolean timeLimitReached = scannerContext.checkTimeLimit(LimitScope.BETWEEN_ROWS);
3406          boolean resultsLimitReached = numOfResults >= maxResults;
3407          limitReached = sizeLimitReached || timeLimitReached || resultsLimitReached;
3408
3409          if (limitReached || !moreRows) {
3410            // With block size limit, we may exceed size limit without collecting any results.
3411            // In this case we want to send heartbeat and/or cursor. We don't want to send heartbeat
3412            // or cursor if results were collected, for example for cell size or heap size limits.
3413            boolean sizeLimitReachedWithoutResults = sizeLimitReached && results.isEmpty();
3414            // We only want to mark a ScanResponse as a heartbeat message in the event that
3415            // there are more values to be read server side. If there aren't more values,
3416            // marking it as a heartbeat is wasteful because the client will need to issue
3417            // another ScanRequest only to realize that they already have all the values
3418            if (moreRows && (timeLimitReached || sizeLimitReachedWithoutResults)) {
3419              // Heartbeat messages occur when the time limit has been reached, or size limit has
3420              // been reached before collecting any results. This can happen for heavily filtered
3421              // scans which scan over too many blocks.
3422              builder.setHeartbeatMessage(true);
3423              if (rsh.needCursor) {
3424                Cell cursorCell = scannerContext.getLastPeekedCell();
3425                if (cursorCell != null) {
3426                  builder.setCursor(ProtobufUtil.toCursor(cursorCell));
3427                }
3428              }
3429            }
3430            break;
3431          }
3432          values.clear();
3433        }
3434        if (rpcCall != null) {
3435          rpcCall.incrementResponseCellSize(scannerContext.getHeapSizeProgress());
3436        }
3437        builder.setMoreResultsInRegion(moreRows);
3438        // Check to see if the client requested that we track metrics server side. If the
3439        // client requested metrics, retrieve the metrics from the scanner context.
3440        if (trackMetrics) {
3441          Map<String, Long> metrics = scannerContext.getMetrics().getMetricsMap();
3442          ScanMetrics.Builder metricBuilder = ScanMetrics.newBuilder();
3443          NameInt64Pair.Builder pairBuilder = NameInt64Pair.newBuilder();
3444
3445          for (Entry<String, Long> entry : metrics.entrySet()) {
3446            pairBuilder.setName(entry.getKey());
3447            pairBuilder.setValue(entry.getValue());
3448            metricBuilder.addMetrics(pairBuilder.build());
3449          }
3450
3451          builder.setScanMetrics(metricBuilder.build());
3452        }
3453      }
3454    } finally {
3455      region.closeRegionOperation();
3456      // Update serverside metrics, even on error.
3457      long end = EnvironmentEdgeManager.currentTime();
3458      long responseCellSize = rpcCall != null ? rpcCall.getResponseCellSize() : 0;
3459      region.getMetrics().updateScanTime(end - before);
3460      final MetricsRegionServer metricsRegionServer = server.getMetrics();
3461      if (metricsRegionServer != null) {
3462        metricsRegionServer.updateScanSize(region.getTableDescriptor().getTableName(),
3463          responseCellSize);
3464        metricsRegionServer.updateScanTime(region.getTableDescriptor().getTableName(),
3465          end - before);
3466        metricsRegionServer.updateReadQueryMeter(region.getRegionInfo().getTable(),
3467          numOfNextRawCalls);
3468      }
3469    }
3470    // coprocessor postNext hook
3471    if (region.getCoprocessorHost() != null) {
3472      region.getCoprocessorHost().postScannerNext(scanner, results, maxResults, true);
3473    }
3474  }
3475
3476  /**
3477   * Scan data in a table.
3478   * @param controller the RPC controller
3479   * @param request    the scan request
3480   */
3481  @Override
3482  public ScanResponse scan(final RpcController controller, final ScanRequest request)
3483    throws ServiceException {
3484    if (controller != null && !(controller instanceof HBaseRpcController)) {
3485      throw new UnsupportedOperationException(
3486        "We only do " + "HBaseRpcControllers! FIX IF A PROBLEM: " + controller);
3487    }
3488    if (!request.hasScannerId() && !request.hasScan()) {
3489      throw new ServiceException(
3490        new DoNotRetryIOException("Missing required input: scannerId or scan"));
3491    }
3492    try {
3493      checkOpen();
3494    } catch (IOException e) {
3495      if (request.hasScannerId()) {
3496        String scannerName = toScannerName(request.getScannerId());
3497        if (LOG.isDebugEnabled()) {
3498          LOG.debug(
3499            "Server shutting down and client tried to access missing scanner " + scannerName);
3500        }
3501        final LeaseManager leaseManager = server.getLeaseManager();
3502        if (leaseManager != null) {
3503          try {
3504            leaseManager.cancelLease(scannerName);
3505          } catch (LeaseException le) {
3506            // No problem, ignore
3507            if (LOG.isTraceEnabled()) {
3508              LOG.trace("Un-able to cancel lease of scanner. It could already be closed.");
3509            }
3510          }
3511        }
3512      }
3513      throw new ServiceException(e);
3514    }
3515    requestCount.increment();
3516    rpcScanRequestCount.increment();
3517    RegionScannerHolder rsh;
3518    ScanResponse.Builder builder = ScanResponse.newBuilder();
3519    String scannerName;
3520    try {
3521      if (request.hasScannerId()) {
3522        // The downstream projects such as AsyncHBase in OpenTSDB need this value. See HBASE-18000
3523        // for more details.
3524        long scannerId = request.getScannerId();
3525        builder.setScannerId(scannerId);
3526        scannerName = toScannerName(scannerId);
3527        rsh = getRegionScanner(request);
3528      } else {
3529        Pair<String, RegionScannerHolder> scannerNameAndRSH = newRegionScanner(request, builder);
3530        scannerName = scannerNameAndRSH.getFirst();
3531        rsh = scannerNameAndRSH.getSecond();
3532      }
3533    } catch (IOException e) {
3534      if (e == SCANNER_ALREADY_CLOSED) {
3535        // Now we will close scanner automatically if there are no more results for this region but
3536        // the old client will still send a close request to us. Just ignore it and return.
3537        return builder.build();
3538      }
3539      throw new ServiceException(e);
3540    }
3541    if (rsh.fullRegionScan) {
3542      rpcFullScanRequestCount.increment();
3543    }
3544    HRegion region = rsh.r;
3545    LeaseManager.Lease lease;
3546    try {
3547      // Remove lease while its being processed in server; protects against case
3548      // where processing of request takes > lease expiration time. or null if none found.
3549      lease = server.getLeaseManager().removeLease(scannerName);
3550    } catch (LeaseException e) {
3551      throw new ServiceException(e);
3552    }
3553    if (request.hasRenew() && request.getRenew()) {
3554      // add back and return
3555      addScannerLeaseBack(lease);
3556      try {
3557        checkScanNextCallSeq(request, rsh);
3558      } catch (OutOfOrderScannerNextException e) {
3559        throw new ServiceException(e);
3560      }
3561      return builder.build();
3562    }
3563    OperationQuota quota;
3564    try {
3565      quota = getRpcQuotaManager().checkQuota(region, OperationQuota.OperationType.SCAN);
3566    } catch (IOException e) {
3567      addScannerLeaseBack(lease);
3568      throw new ServiceException(e);
3569    }
3570    try {
3571      checkScanNextCallSeq(request, rsh);
3572    } catch (OutOfOrderScannerNextException e) {
3573      addScannerLeaseBack(lease);
3574      throw new ServiceException(e);
3575    }
3576    // Now we have increased the next call sequence. If we give client an error, the retry will
3577    // never success. So we'd better close the scanner and return a DoNotRetryIOException to client
3578    // and then client will try to open a new scanner.
3579    boolean closeScanner = request.hasCloseScanner() ? request.getCloseScanner() : false;
3580    int rows; // this is scan.getCaching
3581    if (request.hasNumberOfRows()) {
3582      rows = request.getNumberOfRows();
3583    } else {
3584      rows = closeScanner ? 0 : 1;
3585    }
3586    RpcCall rpcCall = RpcServer.getCurrentCall().orElse(null);
3587    // now let's do the real scan.
3588    long maxQuotaResultSize = Math.min(maxScannerResultSize, quota.getReadAvailable());
3589    RegionScanner scanner = rsh.s;
3590    // this is the limit of rows for this scan, if we the number of rows reach this value, we will
3591    // close the scanner.
3592    int limitOfRows;
3593    if (request.hasLimitOfRows()) {
3594      limitOfRows = request.getLimitOfRows();
3595    } else {
3596      limitOfRows = -1;
3597    }
3598    boolean scannerClosed = false;
3599    try {
3600      List<Result> results = new ArrayList<>(Math.min(rows, 512));
3601      if (rows > 0) {
3602        boolean done = false;
3603        // Call coprocessor. Get region info from scanner.
3604        if (region.getCoprocessorHost() != null) {
3605          Boolean bypass = region.getCoprocessorHost().preScannerNext(scanner, results, rows);
3606          if (!results.isEmpty()) {
3607            for (Result r : results) {
3608              // add cell size from CP results so we can track response size and update limits
3609              // when calling scan below if !done. We'll also have tracked block size if the CP
3610              // got results from hbase, since StoreScanner tracks that for all calls automatically.
3611              addSize(rpcCall, r);
3612            }
3613          }
3614          if (bypass != null && bypass.booleanValue()) {
3615            done = true;
3616          }
3617        }
3618        if (!done) {
3619          scan((HBaseRpcController) controller, request, rsh, maxQuotaResultSize, rows, limitOfRows,
3620            results, builder, rpcCall);
3621        } else {
3622          builder.setMoreResultsInRegion(!results.isEmpty());
3623        }
3624      } else {
3625        // This is a open scanner call with numberOfRow = 0, so set more results in region to true.
3626        builder.setMoreResultsInRegion(true);
3627      }
3628
3629      quota.addScanResult(results);
3630      addResults(builder, results, (HBaseRpcController) controller,
3631        RegionReplicaUtil.isDefaultReplica(region.getRegionInfo()),
3632        isClientCellBlockSupport(rpcCall));
3633      if (scanner.isFilterDone() && results.isEmpty()) {
3634        // If the scanner's filter - if any - is done with the scan
3635        // only set moreResults to false if the results is empty. This is used to keep compatible
3636        // with the old scan implementation where we just ignore the returned results if moreResults
3637        // is false. Can remove the isEmpty check after we get rid of the old implementation.
3638        builder.setMoreResults(false);
3639      }
3640      // Later we may close the scanner depending on this flag so here we need to make sure that we
3641      // have already set this flag.
3642      assert builder.hasMoreResultsInRegion();
3643      // we only set moreResults to false in the above code, so set it to true if we haven't set it
3644      // yet.
3645      if (!builder.hasMoreResults()) {
3646        builder.setMoreResults(true);
3647      }
3648      if (builder.getMoreResults() && builder.getMoreResultsInRegion() && !results.isEmpty()) {
3649        // Record the last cell of the last result if it is a partial result
3650        // We need this to calculate the complete rows we have returned to client as the
3651        // mayHaveMoreCellsInRow is true does not mean that there will be extra cells for the
3652        // current row. We may filter out all the remaining cells for the current row and just
3653        // return the cells of the nextRow when calling RegionScanner.nextRaw. So here we need to
3654        // check for row change.
3655        Result lastResult = results.get(results.size() - 1);
3656        if (lastResult.mayHaveMoreCellsInRow()) {
3657          rsh.rowOfLastPartialResult = lastResult.getRow();
3658        } else {
3659          rsh.rowOfLastPartialResult = null;
3660        }
3661      }
3662      if (!builder.getMoreResults() || !builder.getMoreResultsInRegion() || closeScanner) {
3663        scannerClosed = true;
3664        closeScanner(region, scanner, scannerName, rpcCall);
3665      }
3666
3667      // There's no point returning to a timed out client. Throwing ensures scanner is closed
3668      if (rpcCall != null && EnvironmentEdgeManager.currentTime() > rpcCall.getDeadline()) {
3669        throw new TimeoutIOException("Client deadline exceeded, cannot return results");
3670      }
3671
3672      return builder.build();
3673    } catch (IOException e) {
3674      try {
3675        // scanner is closed here
3676        scannerClosed = true;
3677        // The scanner state might be left in a dirty state, so we will tell the Client to
3678        // fail this RPC and close the scanner while opening up another one from the start of
3679        // row that the client has last seen.
3680        closeScanner(region, scanner, scannerName, rpcCall);
3681
3682        // If it is a DoNotRetryIOException already, throw as it is. Unfortunately, DNRIOE is
3683        // used in two different semantics.
3684        // (1) The first is to close the client scanner and bubble up the exception all the way
3685        // to the application. This is preferred when the exception is really un-recoverable
3686        // (like CorruptHFileException, etc). Plain DoNotRetryIOException also falls into this
3687        // bucket usually.
3688        // (2) Second semantics is to close the current region scanner only, but continue the
3689        // client scanner by overriding the exception. This is usually UnknownScannerException,
3690        // OutOfOrderScannerNextException, etc where the region scanner has to be closed, but the
3691        // application-level ClientScanner has to continue without bubbling up the exception to
3692        // the client. See ClientScanner code to see how it deals with these special exceptions.
3693        if (e instanceof DoNotRetryIOException) {
3694          throw e;
3695        }
3696
3697        // If it is a FileNotFoundException, wrap as a
3698        // DoNotRetryIOException. This can avoid the retry in ClientScanner.
3699        if (e instanceof FileNotFoundException) {
3700          throw new DoNotRetryIOException(e);
3701        }
3702
3703        // We closed the scanner already. Instead of throwing the IOException, and client
3704        // retrying with the same scannerId only to get USE on the next RPC, we directly throw
3705        // a special exception to save an RPC.
3706        if (VersionInfoUtil.hasMinimumVersion(rpcCall.getClientVersionInfo(), 1, 4)) {
3707          // 1.4.0+ clients know how to handle
3708          throw new ScannerResetException("Scanner is closed on the server-side", e);
3709        } else {
3710          // older clients do not know about SRE. Just throw USE, which they will handle
3711          throw new UnknownScannerException("Throwing UnknownScannerException to reset the client"
3712            + " scanner state for clients older than 1.3.", e);
3713        }
3714      } catch (IOException ioe) {
3715        throw new ServiceException(ioe);
3716      }
3717    } finally {
3718      if (!scannerClosed) {
3719        // Adding resets expiration time on lease.
3720        // the closeCallBack will be set in closeScanner so here we only care about shippedCallback
3721        if (rpcCall != null) {
3722          rpcCall.setCallBack(rsh.shippedCallback);
3723        } else {
3724          // If context is null,here we call rsh.shippedCallback directly to reuse the logic in
3725          // rsh.shippedCallback to release the internal resources in rsh,and lease is also added
3726          // back to regionserver's LeaseManager in rsh.shippedCallback.
3727          runShippedCallback(rsh);
3728        }
3729      }
3730      quota.close();
3731    }
3732  }
3733
3734  private void runShippedCallback(RegionScannerHolder rsh) throws ServiceException {
3735    assert rsh.shippedCallback != null;
3736    try {
3737      rsh.shippedCallback.run();
3738    } catch (IOException ioe) {
3739      throw new ServiceException(ioe);
3740    }
3741  }
3742
3743  private void closeScanner(HRegion region, RegionScanner scanner, String scannerName,
3744    RpcCallContext context) throws IOException {
3745    if (region.getCoprocessorHost() != null) {
3746      if (region.getCoprocessorHost().preScannerClose(scanner)) {
3747        // bypass the actual close.
3748        return;
3749      }
3750    }
3751    RegionScannerHolder rsh = scanners.remove(scannerName);
3752    if (rsh != null) {
3753      if (context != null) {
3754        context.setCallBack(rsh.closeCallBack);
3755      } else {
3756        rsh.s.close();
3757      }
3758      if (region.getCoprocessorHost() != null) {
3759        region.getCoprocessorHost().postScannerClose(scanner);
3760      }
3761      closedScanners.put(scannerName, scannerName);
3762    }
3763  }
3764
3765  @Override
3766  public CoprocessorServiceResponse execRegionServerService(RpcController controller,
3767    CoprocessorServiceRequest request) throws ServiceException {
3768    rpcPreCheck("execRegionServerService");
3769    return server.execRegionServerService(controller, request);
3770  }
3771
3772  @Override
3773  public GetSpaceQuotaSnapshotsResponse getSpaceQuotaSnapshots(RpcController controller,
3774    GetSpaceQuotaSnapshotsRequest request) throws ServiceException {
3775    try {
3776      final RegionServerSpaceQuotaManager manager = server.getRegionServerSpaceQuotaManager();
3777      final GetSpaceQuotaSnapshotsResponse.Builder builder =
3778        GetSpaceQuotaSnapshotsResponse.newBuilder();
3779      if (manager != null) {
3780        final Map<TableName, SpaceQuotaSnapshot> snapshots = manager.copyQuotaSnapshots();
3781        for (Entry<TableName, SpaceQuotaSnapshot> snapshot : snapshots.entrySet()) {
3782          builder.addSnapshots(TableQuotaSnapshot.newBuilder()
3783            .setTableName(ProtobufUtil.toProtoTableName(snapshot.getKey()))
3784            .setSnapshot(SpaceQuotaSnapshot.toProtoSnapshot(snapshot.getValue())).build());
3785        }
3786      }
3787      return builder.build();
3788    } catch (Exception e) {
3789      throw new ServiceException(e);
3790    }
3791  }
3792
3793  @Override
3794  public ClearRegionBlockCacheResponse clearRegionBlockCache(RpcController controller,
3795    ClearRegionBlockCacheRequest request) throws ServiceException {
3796    rpcPreCheck("clearRegionBlockCache");
3797    ClearRegionBlockCacheResponse.Builder builder = ClearRegionBlockCacheResponse.newBuilder();
3798    CacheEvictionStatsBuilder stats = CacheEvictionStats.builder();
3799    List<HRegion> regions = getRegions(request.getRegionList(), stats);
3800    for (HRegion region : regions) {
3801      try {
3802        stats = stats.append(this.server.clearRegionBlockCache(region));
3803      } catch (Exception e) {
3804        stats.addException(region.getRegionInfo().getRegionName(), e);
3805      }
3806    }
3807    stats.withMaxCacheSize(server.getBlockCache().map(BlockCache::getMaxSize).orElse(0L));
3808    return builder.setStats(ProtobufUtil.toCacheEvictionStats(stats.build())).build();
3809  }
3810
3811  private void executeOpenRegionProcedures(OpenRegionRequest request,
3812    Map<TableName, TableDescriptor> tdCache) {
3813    long masterSystemTime = request.hasMasterSystemTime() ? request.getMasterSystemTime() : -1;
3814    for (RegionOpenInfo regionOpenInfo : request.getOpenInfoList()) {
3815      RegionInfo regionInfo = ProtobufUtil.toRegionInfo(regionOpenInfo.getRegion());
3816      TableName tableName = regionInfo.getTable();
3817      TableDescriptor tableDesc = tdCache.get(tableName);
3818      if (tableDesc == null) {
3819        try {
3820          tableDesc = server.getTableDescriptors().get(regionInfo.getTable());
3821        } catch (IOException e) {
3822          // Here we do not fail the whole method since we also need deal with other
3823          // procedures, and we can not ignore this one, so we still schedule a
3824          // AssignRegionHandler and it will report back to master if we still can not get the
3825          // TableDescriptor.
3826          LOG.warn("Failed to get TableDescriptor of {}, will try again in the handler",
3827            regionInfo.getTable(), e);
3828        }
3829        if (tableDesc != null) {
3830          tdCache.put(tableName, tableDesc);
3831        }
3832      }
3833      if (regionOpenInfo.getFavoredNodesCount() > 0) {
3834        server.updateRegionFavoredNodesMapping(regionInfo.getEncodedName(),
3835          regionOpenInfo.getFavoredNodesList());
3836      }
3837      long procId = regionOpenInfo.getOpenProcId();
3838      if (server.submitRegionProcedure(procId)) {
3839        server.getExecutorService().submit(
3840          AssignRegionHandler.create(server, regionInfo, procId, tableDesc, masterSystemTime));
3841      }
3842    }
3843  }
3844
3845  private void executeCloseRegionProcedures(CloseRegionRequest request) {
3846    String encodedName;
3847    try {
3848      encodedName = ProtobufUtil.getRegionEncodedName(request.getRegion());
3849    } catch (DoNotRetryIOException e) {
3850      throw new UncheckedIOException("Should not happen", e);
3851    }
3852    ServerName destination = request.hasDestinationServer()
3853      ? ProtobufUtil.toServerName(request.getDestinationServer())
3854      : null;
3855    long procId = request.getCloseProcId();
3856    boolean evictCache = request.getEvictCache();
3857    if (server.submitRegionProcedure(procId)) {
3858      server.getExecutorService().submit(
3859        UnassignRegionHandler.create(server, encodedName, procId, false, destination, evictCache));
3860    }
3861  }
3862
3863  private void executeProcedures(RemoteProcedureRequest request) {
3864    RSProcedureCallable callable;
3865    try {
3866      callable = Class.forName(request.getProcClass()).asSubclass(RSProcedureCallable.class)
3867        .getDeclaredConstructor().newInstance();
3868    } catch (Exception e) {
3869      LOG.warn("Failed to instantiating remote procedure {}, pid={}", request.getProcClass(),
3870        request.getProcId(), e);
3871      server.remoteProcedureComplete(request.getProcId(), e);
3872      return;
3873    }
3874    callable.init(request.getProcData().toByteArray(), server);
3875    LOG.debug("Executing remote procedure {}, pid={}", callable.getClass(), request.getProcId());
3876    server.executeProcedure(request.getProcId(), callable);
3877  }
3878
3879  @Override
3880  @QosPriority(priority = HConstants.ADMIN_QOS)
3881  public ExecuteProceduresResponse executeProcedures(RpcController controller,
3882    ExecuteProceduresRequest request) throws ServiceException {
3883    try {
3884      checkOpen();
3885      throwOnWrongStartCode(request);
3886      server.getRegionServerCoprocessorHost().preExecuteProcedures();
3887      if (request.getOpenRegionCount() > 0) {
3888        // Avoid reading from the TableDescritor every time(usually it will read from the file
3889        // system)
3890        Map<TableName, TableDescriptor> tdCache = new HashMap<>();
3891        request.getOpenRegionList().forEach(req -> executeOpenRegionProcedures(req, tdCache));
3892      }
3893      if (request.getCloseRegionCount() > 0) {
3894        request.getCloseRegionList().forEach(this::executeCloseRegionProcedures);
3895      }
3896      if (request.getProcCount() > 0) {
3897        request.getProcList().forEach(this::executeProcedures);
3898      }
3899      server.getRegionServerCoprocessorHost().postExecuteProcedures();
3900      return ExecuteProceduresResponse.getDefaultInstance();
3901    } catch (IOException e) {
3902      throw new ServiceException(e);
3903    }
3904  }
3905
3906  @Override
3907  public GetAllBootstrapNodesResponse getAllBootstrapNodes(RpcController controller,
3908    GetAllBootstrapNodesRequest request) throws ServiceException {
3909    GetAllBootstrapNodesResponse.Builder builder = GetAllBootstrapNodesResponse.newBuilder();
3910    server.getBootstrapNodes()
3911      .forEachRemaining(server -> builder.addNode(ProtobufUtil.toServerName(server)));
3912    return builder.build();
3913  }
3914}