001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import com.google.errorprone.annotations.RestrictedApi;
021import java.io.FileNotFoundException;
022import java.io.IOException;
023import java.io.UncheckedIOException;
024import java.net.BindException;
025import java.net.InetAddress;
026import java.net.InetSocketAddress;
027import java.nio.ByteBuffer;
028import java.util.ArrayList;
029import java.util.Arrays;
030import java.util.Collections;
031import java.util.HashMap;
032import java.util.Iterator;
033import java.util.List;
034import java.util.Map;
035import java.util.Map.Entry;
036import java.util.NavigableMap;
037import java.util.Set;
038import java.util.TreeSet;
039import java.util.concurrent.ConcurrentHashMap;
040import java.util.concurrent.ConcurrentMap;
041import java.util.concurrent.TimeUnit;
042import java.util.concurrent.atomic.AtomicBoolean;
043import java.util.concurrent.atomic.AtomicLong;
044import java.util.concurrent.atomic.LongAdder;
045import org.apache.commons.lang3.mutable.MutableObject;
046import org.apache.hadoop.conf.Configuration;
047import org.apache.hadoop.fs.FileSystem;
048import org.apache.hadoop.fs.Path;
049import org.apache.hadoop.hbase.ByteBufferExtendedCell;
050import org.apache.hadoop.hbase.CacheEvictionStats;
051import org.apache.hadoop.hbase.CacheEvictionStatsBuilder;
052import org.apache.hadoop.hbase.Cell;
053import org.apache.hadoop.hbase.CellScannable;
054import org.apache.hadoop.hbase.CellScanner;
055import org.apache.hadoop.hbase.CellUtil;
056import org.apache.hadoop.hbase.DoNotRetryIOException;
057import org.apache.hadoop.hbase.DroppedSnapshotException;
058import org.apache.hadoop.hbase.HBaseIOException;
059import org.apache.hadoop.hbase.HBaseRpcServicesBase;
060import org.apache.hadoop.hbase.HConstants;
061import org.apache.hadoop.hbase.MultiActionResultTooLarge;
062import org.apache.hadoop.hbase.NotServingRegionException;
063import org.apache.hadoop.hbase.PrivateCellUtil;
064import org.apache.hadoop.hbase.RegionTooBusyException;
065import org.apache.hadoop.hbase.Server;
066import org.apache.hadoop.hbase.ServerName;
067import org.apache.hadoop.hbase.TableName;
068import org.apache.hadoop.hbase.UnknownScannerException;
069import org.apache.hadoop.hbase.client.Append;
070import org.apache.hadoop.hbase.client.CheckAndMutate;
071import org.apache.hadoop.hbase.client.CheckAndMutateResult;
072import org.apache.hadoop.hbase.client.Delete;
073import org.apache.hadoop.hbase.client.Durability;
074import org.apache.hadoop.hbase.client.Get;
075import org.apache.hadoop.hbase.client.Increment;
076import org.apache.hadoop.hbase.client.Mutation;
077import org.apache.hadoop.hbase.client.OperationWithAttributes;
078import org.apache.hadoop.hbase.client.Put;
079import org.apache.hadoop.hbase.client.RegionInfo;
080import org.apache.hadoop.hbase.client.RegionReplicaUtil;
081import org.apache.hadoop.hbase.client.Result;
082import org.apache.hadoop.hbase.client.Row;
083import org.apache.hadoop.hbase.client.Scan;
084import org.apache.hadoop.hbase.client.TableDescriptor;
085import org.apache.hadoop.hbase.client.VersionInfoUtil;
086import org.apache.hadoop.hbase.exceptions.FailedSanityCheckException;
087import org.apache.hadoop.hbase.exceptions.OutOfOrderScannerNextException;
088import org.apache.hadoop.hbase.exceptions.ScannerResetException;
089import org.apache.hadoop.hbase.exceptions.TimeoutIOException;
090import org.apache.hadoop.hbase.exceptions.UnknownProtocolException;
091import org.apache.hadoop.hbase.io.ByteBuffAllocator;
092import org.apache.hadoop.hbase.io.hfile.BlockCache;
093import org.apache.hadoop.hbase.ipc.HBaseRpcController;
094import org.apache.hadoop.hbase.ipc.PriorityFunction;
095import org.apache.hadoop.hbase.ipc.QosPriority;
096import org.apache.hadoop.hbase.ipc.RpcCall;
097import org.apache.hadoop.hbase.ipc.RpcCallContext;
098import org.apache.hadoop.hbase.ipc.RpcCallback;
099import org.apache.hadoop.hbase.ipc.RpcServer;
100import org.apache.hadoop.hbase.ipc.RpcServer.BlockingServiceAndInterface;
101import org.apache.hadoop.hbase.ipc.RpcServerFactory;
102import org.apache.hadoop.hbase.ipc.RpcServerInterface;
103import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
104import org.apache.hadoop.hbase.ipc.ServerRpcController;
105import org.apache.hadoop.hbase.net.Address;
106import org.apache.hadoop.hbase.procedure2.RSProcedureCallable;
107import org.apache.hadoop.hbase.quotas.ActivePolicyEnforcement;
108import org.apache.hadoop.hbase.quotas.OperationQuota;
109import org.apache.hadoop.hbase.quotas.QuotaUtil;
110import org.apache.hadoop.hbase.quotas.RegionServerRpcQuotaManager;
111import org.apache.hadoop.hbase.quotas.RegionServerSpaceQuotaManager;
112import org.apache.hadoop.hbase.quotas.SpaceQuotaSnapshot;
113import org.apache.hadoop.hbase.quotas.SpaceViolationPolicyEnforcement;
114import org.apache.hadoop.hbase.regionserver.LeaseManager.Lease;
115import org.apache.hadoop.hbase.regionserver.LeaseManager.LeaseStillHeldException;
116import org.apache.hadoop.hbase.regionserver.Region.Operation;
117import org.apache.hadoop.hbase.regionserver.ScannerContext.LimitScope;
118import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
119import org.apache.hadoop.hbase.regionserver.handler.AssignRegionHandler;
120import org.apache.hadoop.hbase.regionserver.handler.OpenMetaHandler;
121import org.apache.hadoop.hbase.regionserver.handler.OpenPriorityRegionHandler;
122import org.apache.hadoop.hbase.regionserver.handler.OpenRegionHandler;
123import org.apache.hadoop.hbase.regionserver.handler.UnassignRegionHandler;
124import org.apache.hadoop.hbase.replication.ReplicationUtils;
125import org.apache.hadoop.hbase.replication.regionserver.RejectReplicationRequestStateChecker;
126import org.apache.hadoop.hbase.replication.regionserver.RejectRequestsFromClientStateChecker;
127import org.apache.hadoop.hbase.security.Superusers;
128import org.apache.hadoop.hbase.security.access.Permission;
129import org.apache.hadoop.hbase.util.Bytes;
130import org.apache.hadoop.hbase.util.DNS;
131import org.apache.hadoop.hbase.util.DNS.ServerType;
132import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
133import org.apache.hadoop.hbase.util.Pair;
134import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil;
135import org.apache.hadoop.hbase.wal.WAL;
136import org.apache.hadoop.hbase.wal.WALEdit;
137import org.apache.hadoop.hbase.wal.WALKey;
138import org.apache.hadoop.hbase.wal.WALSplitUtil;
139import org.apache.hadoop.hbase.wal.WALSplitUtil.MutationReplay;
140import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
141import org.apache.yetus.audience.InterfaceAudience;
142import org.slf4j.Logger;
143import org.slf4j.LoggerFactory;
144
145import org.apache.hbase.thirdparty.com.google.common.cache.Cache;
146import org.apache.hbase.thirdparty.com.google.common.cache.CacheBuilder;
147import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList;
148import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
149import org.apache.hbase.thirdparty.com.google.protobuf.ByteString;
150import org.apache.hbase.thirdparty.com.google.protobuf.Message;
151import org.apache.hbase.thirdparty.com.google.protobuf.RpcController;
152import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
153import org.apache.hbase.thirdparty.com.google.protobuf.TextFormat;
154import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations;
155import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils;
156
157import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
158import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
159import org.apache.hadoop.hbase.shaded.protobuf.ResponseConverter;
160import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService;
161import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearCompactionQueuesRequest;
162import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearCompactionQueuesResponse;
163import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearRegionBlockCacheRequest;
164import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearRegionBlockCacheResponse;
165import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CloseRegionRequest;
166import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CloseRegionResponse;
167import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactRegionRequest;
168import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactRegionResponse;
169import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactionSwitchRequest;
170import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactionSwitchResponse;
171import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ExecuteProceduresRequest;
172import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ExecuteProceduresResponse;
173import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionRequest;
174import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionResponse;
175import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetOnlineRegionRequest;
176import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetOnlineRegionResponse;
177import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoRequest;
178import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoResponse;
179import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionLoadRequest;
180import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionLoadResponse;
181import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetServerInfoRequest;
182import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetServerInfoResponse;
183import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetStoreFileRequest;
184import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetStoreFileResponse;
185import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionRequest;
186import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionRequest.RegionOpenInfo;
187import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionResponse;
188import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionResponse.RegionOpeningState;
189import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.RemoteProcedureRequest;
190import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ReplicateWALEntryRequest;
191import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ReplicateWALEntryResponse;
192import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.RollWALWriterRequest;
193import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.RollWALWriterResponse;
194import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.StopServerRequest;
195import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.StopServerResponse;
196import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateFavoredNodesRequest;
197import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateFavoredNodesResponse;
198import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WALEntry;
199import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WarmupRegionRequest;
200import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WarmupRegionResponse;
201import org.apache.hadoop.hbase.shaded.protobuf.generated.BootstrapNodeProtos.BootstrapNodeService;
202import org.apache.hadoop.hbase.shaded.protobuf.generated.BootstrapNodeProtos.GetAllBootstrapNodesRequest;
203import org.apache.hadoop.hbase.shaded.protobuf.generated.BootstrapNodeProtos.GetAllBootstrapNodesResponse;
204import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
205import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.Action;
206import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.BulkLoadHFileRequest;
207import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.BulkLoadHFileRequest.FamilyPath;
208import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.BulkLoadHFileResponse;
209import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CleanupBulkLoadRequest;
210import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CleanupBulkLoadResponse;
211import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService;
212import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.Condition;
213import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceRequest;
214import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceResponse;
215import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.GetRequest;
216import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.GetResponse;
217import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MultiRegionLoadStats;
218import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MultiRequest;
219import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MultiResponse;
220import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateRequest;
221import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateResponse;
222import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutationProto;
223import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutationProto.MutationType;
224import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.PrepareBulkLoadRequest;
225import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.PrepareBulkLoadResponse;
226import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.RegionAction;
227import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.RegionActionResult;
228import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ResultOrException;
229import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanRequest;
230import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanResponse;
231import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos;
232import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos.RegionLoad;
233import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.NameBytesPair;
234import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.NameInt64Pair;
235import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier;
236import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType;
237import org.apache.hadoop.hbase.shaded.protobuf.generated.MapReduceProtos.ScanMetrics;
238import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsRequest;
239import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsResponse;
240import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsResponse.TableQuotaSnapshot;
241import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.ClientMetaService;
242import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.BulkLoadDescriptor;
243import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.CompactionDescriptor;
244import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.FlushDescriptor;
245import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.RegionEventDescriptor;
246
247/**
248 * Implements the regionserver RPC services.
249 */
250@InterfaceAudience.Private
251public class RSRpcServices extends HBaseRpcServicesBase<HRegionServer>
252  implements ClientService.BlockingInterface, BootstrapNodeService.BlockingInterface {
253
254  private static final Logger LOG = LoggerFactory.getLogger(RSRpcServices.class);
255
256  /** RPC scheduler to use for the region server. */
257  public static final String REGION_SERVER_RPC_SCHEDULER_FACTORY_CLASS =
258    "hbase.region.server.rpc.scheduler.factory.class";
259
260  /**
261   * Minimum allowable time limit delta (in milliseconds) that can be enforced during scans. This
262   * configuration exists to prevent the scenario where a time limit is specified to be so
263   * restrictive that the time limit is reached immediately (before any cells are scanned).
264   */
265  private static final String REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA =
266    "hbase.region.server.rpc.minimum.scan.time.limit.delta";
267  /**
268   * Default value of {@link RSRpcServices#REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA}
269   */
270  static final long DEFAULT_REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA = 10;
271
272  /**
273   * Whether to reject rows with size > threshold defined by
274   * {@link HConstants#BATCH_ROWS_THRESHOLD_NAME}
275   */
276  private static final String REJECT_BATCH_ROWS_OVER_THRESHOLD =
277    "hbase.rpc.rows.size.threshold.reject";
278
279  /**
280   * Default value of config {@link RSRpcServices#REJECT_BATCH_ROWS_OVER_THRESHOLD}
281   */
282  private static final boolean DEFAULT_REJECT_BATCH_ROWS_OVER_THRESHOLD = false;
283
284  // Request counter. (Includes requests that are not serviced by regions.)
285  // Count only once for requests with multiple actions like multi/caching-scan/replayBatch
286  final LongAdder requestCount = new LongAdder();
287
288  // Request counter for rpc get
289  final LongAdder rpcGetRequestCount = new LongAdder();
290
291  // Request counter for rpc scan
292  final LongAdder rpcScanRequestCount = new LongAdder();
293
294  // Request counter for scans that might end up in full scans
295  final LongAdder rpcFullScanRequestCount = new LongAdder();
296
297  // Request counter for rpc multi
298  final LongAdder rpcMultiRequestCount = new LongAdder();
299
300  // Request counter for rpc mutate
301  final LongAdder rpcMutateRequestCount = new LongAdder();
302
303  private final long maxScannerResultSize;
304
305  private ScannerIdGenerator scannerIdGenerator;
306  private final ConcurrentMap<String, RegionScannerHolder> scanners = new ConcurrentHashMap<>();
307  // Hold the name of a closed scanner for a while. This is used to keep compatible for old clients
308  // which may send next or close request to a region scanner which has already been exhausted. The
309  // entries will be removed automatically after scannerLeaseTimeoutPeriod.
310  private final Cache<String, String> closedScanners;
311  /**
312   * The lease timeout period for client scanners (milliseconds).
313   */
314  private final int scannerLeaseTimeoutPeriod;
315
316  /**
317   * The RPC timeout period (milliseconds)
318   */
319  private final int rpcTimeout;
320
321  /**
322   * The minimum allowable delta to use for the scan limit
323   */
324  private final long minimumScanTimeLimitDelta;
325
326  /**
327   * Row size threshold for multi requests above which a warning is logged
328   */
329  private final int rowSizeWarnThreshold;
330  /*
331   * Whether we should reject requests with very high no of rows i.e. beyond threshold defined by
332   * rowSizeWarnThreshold
333   */
334  private final boolean rejectRowsWithSizeOverThreshold;
335
336  final AtomicBoolean clearCompactionQueues = new AtomicBoolean(false);
337
338  /**
339   * Services launched in RSRpcServices. By default they are on but you can use the below booleans
340   * to selectively enable/disable either Admin or Client Service (Rare is the case where you would
341   * ever turn off one or the other).
342   */
343  public static final String REGIONSERVER_ADMIN_SERVICE_CONFIG =
344    "hbase.regionserver.admin.executorService";
345  public static final String REGIONSERVER_CLIENT_SERVICE_CONFIG =
346    "hbase.regionserver.client.executorService";
347  public static final String REGIONSERVER_CLIENT_META_SERVICE_CONFIG =
348    "hbase.regionserver.client.meta.executorService";
349
350  /**
351   * An Rpc callback for closing a RegionScanner.
352   */
353  private static final class RegionScannerCloseCallBack implements RpcCallback {
354
355    private final RegionScanner scanner;
356
357    public RegionScannerCloseCallBack(RegionScanner scanner) {
358      this.scanner = scanner;
359    }
360
361    @Override
362    public void run() throws IOException {
363      this.scanner.close();
364    }
365  }
366
367  /**
368   * An Rpc callback for doing shipped() call on a RegionScanner.
369   */
370  private class RegionScannerShippedCallBack implements RpcCallback {
371    private final String scannerName;
372    private final Shipper shipper;
373    private final Lease lease;
374
375    public RegionScannerShippedCallBack(String scannerName, Shipper shipper, Lease lease) {
376      this.scannerName = scannerName;
377      this.shipper = shipper;
378      this.lease = lease;
379    }
380
381    @Override
382    public void run() throws IOException {
383      this.shipper.shipped();
384      // We're done. On way out re-add the above removed lease. The lease was temp removed for this
385      // Rpc call and we are at end of the call now. Time to add it back.
386      if (scanners.containsKey(scannerName)) {
387        if (lease != null) {
388          server.getLeaseManager().addLease(lease);
389        }
390      }
391    }
392  }
393
394  /**
395   * An RpcCallBack that creates a list of scanners that needs to perform callBack operation on
396   * completion of multiGets.
397   */
398  static class RegionScannersCloseCallBack implements RpcCallback {
399    private final List<RegionScanner> scanners = new ArrayList<>();
400
401    public void addScanner(RegionScanner scanner) {
402      this.scanners.add(scanner);
403    }
404
405    @Override
406    public void run() {
407      for (RegionScanner scanner : scanners) {
408        try {
409          scanner.close();
410        } catch (IOException e) {
411          LOG.error("Exception while closing the scanner " + scanner, e);
412        }
413      }
414    }
415  }
416
417  /**
418   * Holder class which holds the RegionScanner, nextCallSeq and RpcCallbacks together.
419   */
420  static final class RegionScannerHolder {
421    private final AtomicLong nextCallSeq = new AtomicLong(0);
422    private final RegionScanner s;
423    private final HRegion r;
424    private final RpcCallback closeCallBack;
425    private final RpcCallback shippedCallback;
426    private byte[] rowOfLastPartialResult;
427    private boolean needCursor;
428    private boolean fullRegionScan;
429    private final String clientIPAndPort;
430    private final String userName;
431
432    RegionScannerHolder(RegionScanner s, HRegion r, RpcCallback closeCallBack,
433      RpcCallback shippedCallback, boolean needCursor, boolean fullRegionScan,
434      String clientIPAndPort, String userName) {
435      this.s = s;
436      this.r = r;
437      this.closeCallBack = closeCallBack;
438      this.shippedCallback = shippedCallback;
439      this.needCursor = needCursor;
440      this.fullRegionScan = fullRegionScan;
441      this.clientIPAndPort = clientIPAndPort;
442      this.userName = userName;
443    }
444
445    long getNextCallSeq() {
446      return nextCallSeq.get();
447    }
448
449    boolean incNextCallSeq(long currentSeq) {
450      // Use CAS to prevent multiple scan request running on the same scanner.
451      return nextCallSeq.compareAndSet(currentSeq, currentSeq + 1);
452    }
453
454    // Should be called only when we need to print lease expired messages otherwise
455    // cache the String once made.
456    @Override
457    public String toString() {
458      return "clientIPAndPort=" + this.clientIPAndPort + ", userName=" + this.userName
459        + ", regionInfo=" + this.r.getRegionInfo().getRegionNameAsString();
460    }
461  }
462
463  /**
464   * Instantiated as a scanner lease. If the lease times out, the scanner is closed
465   */
466  private class ScannerListener implements LeaseListener {
467    private final String scannerName;
468
469    ScannerListener(final String n) {
470      this.scannerName = n;
471    }
472
473    @Override
474    public void leaseExpired() {
475      RegionScannerHolder rsh = scanners.remove(this.scannerName);
476      if (rsh == null) {
477        LOG.warn("Scanner lease {} expired but no outstanding scanner", this.scannerName);
478        return;
479      }
480      LOG.info("Scanner lease {} expired {}", this.scannerName, rsh);
481      server.getMetrics().incrScannerLeaseExpired();
482      RegionScanner s = rsh.s;
483      HRegion region = null;
484      try {
485        region = server.getRegion(s.getRegionInfo().getRegionName());
486        if (region != null && region.getCoprocessorHost() != null) {
487          region.getCoprocessorHost().preScannerClose(s);
488        }
489      } catch (IOException e) {
490        LOG.error("Closing scanner {} {}", this.scannerName, rsh, e);
491      } finally {
492        try {
493          s.close();
494          if (region != null && region.getCoprocessorHost() != null) {
495            region.getCoprocessorHost().postScannerClose(s);
496          }
497        } catch (IOException e) {
498          LOG.error("Closing scanner {} {}", this.scannerName, rsh, e);
499        }
500      }
501    }
502  }
503
504  private static ResultOrException getResultOrException(final ClientProtos.Result r,
505    final int index) {
506    return getResultOrException(ResponseConverter.buildActionResult(r), index);
507  }
508
509  private static ResultOrException getResultOrException(final Exception e, final int index) {
510    return getResultOrException(ResponseConverter.buildActionResult(e), index);
511  }
512
513  private static ResultOrException getResultOrException(final ResultOrException.Builder builder,
514    final int index) {
515    return builder.setIndex(index).build();
516  }
517
518  /**
519   * Checks for the following pre-checks in order:
520   * <ol>
521   * <li>RegionServer is running</li>
522   * <li>If authorization is enabled, then RPC caller has ADMIN permissions</li>
523   * </ol>
524   * @param requestName name of rpc request. Used in reporting failures to provide context.
525   * @throws ServiceException If any of the above listed pre-check fails.
526   */
527  private void rpcPreCheck(String requestName) throws ServiceException {
528    try {
529      checkOpen();
530      requirePermission(requestName, Permission.Action.ADMIN);
531    } catch (IOException ioe) {
532      throw new ServiceException(ioe);
533    }
534  }
535
536  private boolean isClientCellBlockSupport(RpcCallContext context) {
537    return context != null && context.isClientCellBlockSupported();
538  }
539
540  private void addResult(final MutateResponse.Builder builder, final Result result,
541    final HBaseRpcController rpcc, boolean clientCellBlockSupported) {
542    if (result == null) return;
543    if (clientCellBlockSupported) {
544      builder.setResult(ProtobufUtil.toResultNoData(result));
545      rpcc.setCellScanner(result.cellScanner());
546    } else {
547      ClientProtos.Result pbr = ProtobufUtil.toResult(result);
548      builder.setResult(pbr);
549    }
550  }
551
552  private void addResults(ScanResponse.Builder builder, List<Result> results,
553    HBaseRpcController controller, boolean isDefaultRegion, boolean clientCellBlockSupported) {
554    builder.setStale(!isDefaultRegion);
555    if (results.isEmpty()) {
556      return;
557    }
558    if (clientCellBlockSupported) {
559      for (Result res : results) {
560        builder.addCellsPerResult(res.size());
561        builder.addPartialFlagPerResult(res.mayHaveMoreCellsInRow());
562      }
563      controller.setCellScanner(CellUtil.createCellScanner(results));
564    } else {
565      for (Result res : results) {
566        ClientProtos.Result pbr = ProtobufUtil.toResult(res);
567        builder.addResults(pbr);
568      }
569    }
570  }
571
572  private CheckAndMutateResult checkAndMutate(HRegion region, List<ClientProtos.Action> actions,
573    CellScanner cellScanner, Condition condition, long nonceGroup,
574    ActivePolicyEnforcement spaceQuotaEnforcement) throws IOException {
575    int countOfCompleteMutation = 0;
576    try {
577      if (!region.getRegionInfo().isMetaRegion()) {
578        server.getMemStoreFlusher().reclaimMemStoreMemory();
579      }
580      List<Mutation> mutations = new ArrayList<>();
581      long nonce = HConstants.NO_NONCE;
582      for (ClientProtos.Action action : actions) {
583        if (action.hasGet()) {
584          throw new DoNotRetryIOException(
585            "Atomic put and/or delete only, not a Get=" + action.getGet());
586        }
587        MutationProto mutation = action.getMutation();
588        MutationType type = mutation.getMutateType();
589        switch (type) {
590          case PUT:
591            Put put = ProtobufUtil.toPut(mutation, cellScanner);
592            ++countOfCompleteMutation;
593            checkCellSizeLimit(region, put);
594            spaceQuotaEnforcement.getPolicyEnforcement(region).check(put);
595            mutations.add(put);
596            break;
597          case DELETE:
598            Delete del = ProtobufUtil.toDelete(mutation, cellScanner);
599            ++countOfCompleteMutation;
600            spaceQuotaEnforcement.getPolicyEnforcement(region).check(del);
601            mutations.add(del);
602            break;
603          case INCREMENT:
604            Increment increment = ProtobufUtil.toIncrement(mutation, cellScanner);
605            nonce = mutation.hasNonce() ? mutation.getNonce() : HConstants.NO_NONCE;
606            ++countOfCompleteMutation;
607            checkCellSizeLimit(region, increment);
608            spaceQuotaEnforcement.getPolicyEnforcement(region).check(increment);
609            mutations.add(increment);
610            break;
611          case APPEND:
612            Append append = ProtobufUtil.toAppend(mutation, cellScanner);
613            nonce = mutation.hasNonce() ? mutation.getNonce() : HConstants.NO_NONCE;
614            ++countOfCompleteMutation;
615            checkCellSizeLimit(region, append);
616            spaceQuotaEnforcement.getPolicyEnforcement(region).check(append);
617            mutations.add(append);
618            break;
619          default:
620            throw new DoNotRetryIOException("invalid mutation type : " + type);
621        }
622      }
623
624      if (mutations.size() == 0) {
625        return new CheckAndMutateResult(true, null);
626      } else {
627        CheckAndMutate checkAndMutate = ProtobufUtil.toCheckAndMutate(condition, mutations);
628        CheckAndMutateResult result = null;
629        if (region.getCoprocessorHost() != null) {
630          result = region.getCoprocessorHost().preCheckAndMutate(checkAndMutate);
631        }
632        if (result == null) {
633          result = region.checkAndMutate(checkAndMutate, nonceGroup, nonce);
634          if (region.getCoprocessorHost() != null) {
635            result = region.getCoprocessorHost().postCheckAndMutate(checkAndMutate, result);
636          }
637        }
638        return result;
639      }
640    } finally {
641      // Currently, the checkAndMutate isn't supported by batch so it won't mess up the cell scanner
642      // even if the malformed cells are not skipped.
643      for (int i = countOfCompleteMutation; i < actions.size(); ++i) {
644        skipCellsForMutation(actions.get(i), cellScanner);
645      }
646    }
647  }
648
649  /**
650   * Execute an append mutation.
651   * @return result to return to client if default operation should be bypassed as indicated by
652   *         RegionObserver, null otherwise
653   */
654  private Result append(final HRegion region, final OperationQuota quota,
655    final MutationProto mutation, final CellScanner cellScanner, long nonceGroup,
656    ActivePolicyEnforcement spaceQuota) throws IOException {
657    long before = EnvironmentEdgeManager.currentTime();
658    Append append = ProtobufUtil.toAppend(mutation, cellScanner);
659    checkCellSizeLimit(region, append);
660    spaceQuota.getPolicyEnforcement(region).check(append);
661    quota.addMutation(append);
662    long nonce = mutation.hasNonce() ? mutation.getNonce() : HConstants.NO_NONCE;
663    Result r = region.append(append, nonceGroup, nonce);
664    if (server.getMetrics() != null) {
665      server.getMetrics().updateAppend(region.getTableDescriptor().getTableName(),
666        EnvironmentEdgeManager.currentTime() - before);
667    }
668    return r == null ? Result.EMPTY_RESULT : r;
669  }
670
671  /**
672   * Execute an increment mutation.
673   */
674  private Result increment(final HRegion region, final OperationQuota quota,
675    final MutationProto mutation, final CellScanner cells, long nonceGroup,
676    ActivePolicyEnforcement spaceQuota) throws IOException {
677    long before = EnvironmentEdgeManager.currentTime();
678    Increment increment = ProtobufUtil.toIncrement(mutation, cells);
679    checkCellSizeLimit(region, increment);
680    spaceQuota.getPolicyEnforcement(region).check(increment);
681    quota.addMutation(increment);
682    long nonce = mutation.hasNonce() ? mutation.getNonce() : HConstants.NO_NONCE;
683    Result r = region.increment(increment, nonceGroup, nonce);
684    final MetricsRegionServer metricsRegionServer = server.getMetrics();
685    if (metricsRegionServer != null) {
686      metricsRegionServer.updateIncrement(region.getTableDescriptor().getTableName(),
687        EnvironmentEdgeManager.currentTime() - before);
688    }
689    return r == null ? Result.EMPTY_RESULT : r;
690  }
691
692  /**
693   * Run through the regionMutation <code>rm</code> and per Mutation, do the work, and then when
694   * done, add an instance of a {@link ResultOrException} that corresponds to each Mutation.
695   * @param cellsToReturn Could be null. May be allocated in this method. This is what this method
696   *                      returns as a 'result'.
697   * @param closeCallBack the callback to be used with multigets
698   * @param context       the current RpcCallContext
699   * @return Return the <code>cellScanner</code> passed
700   */
701  private List<CellScannable> doNonAtomicRegionMutation(final HRegion region,
702    final OperationQuota quota, final RegionAction actions, final CellScanner cellScanner,
703    final RegionActionResult.Builder builder, List<CellScannable> cellsToReturn, long nonceGroup,
704    final RegionScannersCloseCallBack closeCallBack, RpcCallContext context,
705    ActivePolicyEnforcement spaceQuotaEnforcement) {
706    // Gather up CONTIGUOUS Puts and Deletes in this mutations List. Idea is that rather than do
707    // one at a time, we instead pass them in batch. Be aware that the corresponding
708    // ResultOrException instance that matches each Put or Delete is then added down in the
709    // doNonAtomicBatchOp call. We should be staying aligned though the Put and Delete are
710    // deferred/batched
711    List<ClientProtos.Action> mutations = null;
712    long maxQuotaResultSize = Math.min(maxScannerResultSize, quota.getReadAvailable());
713    IOException sizeIOE = null;
714    Object lastBlock = null;
715    ClientProtos.ResultOrException.Builder resultOrExceptionBuilder =
716      ResultOrException.newBuilder();
717    boolean hasResultOrException = false;
718    for (ClientProtos.Action action : actions.getActionList()) {
719      hasResultOrException = false;
720      resultOrExceptionBuilder.clear();
721      try {
722        Result r = null;
723
724        if (
725          context != null && context.isRetryImmediatelySupported()
726            && (context.getResponseCellSize() > maxQuotaResultSize
727              || context.getResponseBlockSize() + context.getResponseExceptionSize()
728                  > maxQuotaResultSize)
729        ) {
730
731          // We're storing the exception since the exception and reason string won't
732          // change after the response size limit is reached.
733          if (sizeIOE == null) {
734            // We don't need the stack un-winding do don't throw the exception.
735            // Throwing will kill the JVM's JIT.
736            //
737            // Instead just create the exception and then store it.
738            sizeIOE = new MultiActionResultTooLarge("Max size exceeded" + " CellSize: "
739              + context.getResponseCellSize() + " BlockSize: " + context.getResponseBlockSize());
740
741            // Only report the exception once since there's only one request that
742            // caused the exception. Otherwise this number will dominate the exceptions count.
743            rpcServer.getMetrics().exception(sizeIOE);
744          }
745
746          // Now that there's an exception is known to be created
747          // use it for the response.
748          //
749          // This will create a copy in the builder.
750          NameBytesPair pair = ResponseConverter.buildException(sizeIOE);
751          resultOrExceptionBuilder.setException(pair);
752          context.incrementResponseExceptionSize(pair.getSerializedSize());
753          resultOrExceptionBuilder.setIndex(action.getIndex());
754          builder.addResultOrException(resultOrExceptionBuilder.build());
755          skipCellsForMutation(action, cellScanner);
756          continue;
757        }
758        if (action.hasGet()) {
759          long before = EnvironmentEdgeManager.currentTime();
760          ClientProtos.Get pbGet = action.getGet();
761          // An asynchbase client, https://github.com/OpenTSDB/asynchbase, starts by trying to do
762          // a get closest before. Throwing the UnknownProtocolException signals it that it needs
763          // to switch and do hbase2 protocol (HBase servers do not tell clients what versions
764          // they are; its a problem for non-native clients like asynchbase. HBASE-20225.
765          if (pbGet.hasClosestRowBefore() && pbGet.getClosestRowBefore()) {
766            throw new UnknownProtocolException("Is this a pre-hbase-1.0.0 or asynchbase client? "
767              + "Client is invoking getClosestRowBefore removed in hbase-2.0.0 replaced by "
768              + "reverse Scan.");
769          }
770          try {
771            Get get = ProtobufUtil.toGet(pbGet);
772            if (context != null) {
773              r = get(get, (region), closeCallBack, context);
774            } else {
775              r = region.get(get);
776            }
777          } finally {
778            final MetricsRegionServer metricsRegionServer = server.getMetrics();
779            if (metricsRegionServer != null) {
780              metricsRegionServer.updateGet(region.getTableDescriptor().getTableName(),
781                EnvironmentEdgeManager.currentTime() - before);
782            }
783          }
784        } else if (action.hasServiceCall()) {
785          hasResultOrException = true;
786          Message result = execServiceOnRegion(region, action.getServiceCall());
787          ClientProtos.CoprocessorServiceResult.Builder serviceResultBuilder =
788            ClientProtos.CoprocessorServiceResult.newBuilder();
789          resultOrExceptionBuilder.setServiceResult(serviceResultBuilder
790            .setValue(serviceResultBuilder.getValueBuilder().setName(result.getClass().getName())
791              // TODO: Copy!!!
792              .setValue(UnsafeByteOperations.unsafeWrap(result.toByteArray()))));
793        } else if (action.hasMutation()) {
794          MutationType type = action.getMutation().getMutateType();
795          if (
796            type != MutationType.PUT && type != MutationType.DELETE && mutations != null
797              && !mutations.isEmpty()
798          ) {
799            // Flush out any Puts or Deletes already collected.
800            doNonAtomicBatchOp(builder, region, quota, mutations, cellScanner,
801              spaceQuotaEnforcement);
802            mutations.clear();
803          }
804          switch (type) {
805            case APPEND:
806              r = append(region, quota, action.getMutation(), cellScanner, nonceGroup,
807                spaceQuotaEnforcement);
808              break;
809            case INCREMENT:
810              r = increment(region, quota, action.getMutation(), cellScanner, nonceGroup,
811                spaceQuotaEnforcement);
812              break;
813            case PUT:
814            case DELETE:
815              // Collect the individual mutations and apply in a batch
816              if (mutations == null) {
817                mutations = new ArrayList<>(actions.getActionCount());
818              }
819              mutations.add(action);
820              break;
821            default:
822              throw new DoNotRetryIOException("Unsupported mutate type: " + type.name());
823          }
824        } else {
825          throw new HBaseIOException("Unexpected Action type");
826        }
827        if (r != null) {
828          ClientProtos.Result pbResult = null;
829          if (isClientCellBlockSupport(context)) {
830            pbResult = ProtobufUtil.toResultNoData(r);
831            // Hard to guess the size here. Just make a rough guess.
832            if (cellsToReturn == null) {
833              cellsToReturn = new ArrayList<>();
834            }
835            cellsToReturn.add(r);
836          } else {
837            pbResult = ProtobufUtil.toResult(r);
838          }
839          lastBlock = addSize(context, r, lastBlock);
840          hasResultOrException = true;
841          resultOrExceptionBuilder.setResult(pbResult);
842        }
843        // Could get to here and there was no result and no exception. Presumes we added
844        // a Put or Delete to the collecting Mutations List for adding later. In this
845        // case the corresponding ResultOrException instance for the Put or Delete will be added
846        // down in the doNonAtomicBatchOp method call rather than up here.
847      } catch (IOException ie) {
848        rpcServer.getMetrics().exception(ie);
849        hasResultOrException = true;
850        NameBytesPair pair = ResponseConverter.buildException(ie);
851        resultOrExceptionBuilder.setException(pair);
852        context.incrementResponseExceptionSize(pair.getSerializedSize());
853      }
854      if (hasResultOrException) {
855        // Propagate index.
856        resultOrExceptionBuilder.setIndex(action.getIndex());
857        builder.addResultOrException(resultOrExceptionBuilder.build());
858      }
859    }
860    // Finish up any outstanding mutations
861    if (!CollectionUtils.isEmpty(mutations)) {
862      doNonAtomicBatchOp(builder, region, quota, mutations, cellScanner, spaceQuotaEnforcement);
863    }
864    return cellsToReturn;
865  }
866
867  private void checkCellSizeLimit(final HRegion r, final Mutation m) throws IOException {
868    if (r.maxCellSize > 0) {
869      CellScanner cells = m.cellScanner();
870      while (cells.advance()) {
871        int size = PrivateCellUtil.estimatedSerializedSizeOf(cells.current());
872        if (size > r.maxCellSize) {
873          String msg = "Cell[" + cells.current() + "] with size " + size + " exceeds limit of "
874            + r.maxCellSize + " bytes";
875          LOG.debug(msg);
876          throw new DoNotRetryIOException(msg);
877        }
878      }
879    }
880  }
881
882  private void doAtomicBatchOp(final RegionActionResult.Builder builder, final HRegion region,
883    final OperationQuota quota, final List<ClientProtos.Action> mutations, final CellScanner cells,
884    long nonceGroup, ActivePolicyEnforcement spaceQuotaEnforcement) throws IOException {
885    // Just throw the exception. The exception will be caught and then added to region-level
886    // exception for RegionAction. Leaving the null to action result is ok since the null
887    // result is viewed as failure by hbase client. And the region-lever exception will be used
888    // to replaced the null result. see AsyncRequestFutureImpl#receiveMultiAction and
889    // AsyncBatchRpcRetryingCaller#onComplete for more details.
890    doBatchOp(builder, region, quota, mutations, cells, nonceGroup, spaceQuotaEnforcement, true);
891  }
892
893  private void doNonAtomicBatchOp(final RegionActionResult.Builder builder, final HRegion region,
894    final OperationQuota quota, final List<ClientProtos.Action> mutations, final CellScanner cells,
895    ActivePolicyEnforcement spaceQuotaEnforcement) {
896    try {
897      doBatchOp(builder, region, quota, mutations, cells, HConstants.NO_NONCE,
898        spaceQuotaEnforcement, false);
899    } catch (IOException e) {
900      // Set the exception for each action. The mutations in same RegionAction are group to
901      // different batch and then be processed individually. Hence, we don't set the region-level
902      // exception here for whole RegionAction.
903      for (Action mutation : mutations) {
904        builder.addResultOrException(getResultOrException(e, mutation.getIndex()));
905      }
906    }
907  }
908
909  /**
910   * Execute a list of mutations. nnn
911   */
912  private void doBatchOp(final RegionActionResult.Builder builder, final HRegion region,
913    final OperationQuota quota, final List<ClientProtos.Action> mutations, final CellScanner cells,
914    long nonceGroup, ActivePolicyEnforcement spaceQuotaEnforcement, boolean atomic)
915    throws IOException {
916    Mutation[] mArray = new Mutation[mutations.size()];
917    long before = EnvironmentEdgeManager.currentTime();
918    boolean batchContainsPuts = false, batchContainsDelete = false;
919    try {
920      /**
921       * HBASE-17924 mutationActionMap is a map to map the relation between mutations and actions
922       * since mutation array may have been reoredered.In order to return the right result or
923       * exception to the corresponding actions, We need to know which action is the mutation belong
924       * to. We can't sort ClientProtos.Action array, since they are bonded to cellscanners.
925       */
926      Map<Mutation, ClientProtos.Action> mutationActionMap = new HashMap<>();
927      int i = 0;
928      long nonce = HConstants.NO_NONCE;
929      for (ClientProtos.Action action : mutations) {
930        if (action.hasGet()) {
931          throw new DoNotRetryIOException(
932            "Atomic put and/or delete only, not a Get=" + action.getGet());
933        }
934        MutationProto m = action.getMutation();
935        Mutation mutation;
936        switch (m.getMutateType()) {
937          case PUT:
938            mutation = ProtobufUtil.toPut(m, cells);
939            batchContainsPuts = true;
940            break;
941
942          case DELETE:
943            mutation = ProtobufUtil.toDelete(m, cells);
944            batchContainsDelete = true;
945            break;
946
947          case INCREMENT:
948            mutation = ProtobufUtil.toIncrement(m, cells);
949            nonce = m.hasNonce() ? m.getNonce() : HConstants.NO_NONCE;
950            break;
951
952          case APPEND:
953            mutation = ProtobufUtil.toAppend(m, cells);
954            nonce = m.hasNonce() ? m.getNonce() : HConstants.NO_NONCE;
955            break;
956
957          default:
958            throw new DoNotRetryIOException("Invalid mutation type : " + m.getMutateType());
959        }
960        mutationActionMap.put(mutation, action);
961        mArray[i++] = mutation;
962        checkCellSizeLimit(region, mutation);
963        // Check if a space quota disallows this mutation
964        spaceQuotaEnforcement.getPolicyEnforcement(region).check(mutation);
965        quota.addMutation(mutation);
966      }
967
968      if (!region.getRegionInfo().isMetaRegion()) {
969        server.getMemStoreFlusher().reclaimMemStoreMemory();
970      }
971
972      // HBASE-17924
973      // Sort to improve lock efficiency for non-atomic batch of operations. If atomic
974      // order is preserved as its expected from the client
975      if (!atomic) {
976        Arrays.sort(mArray, (v1, v2) -> Row.COMPARATOR.compare(v1, v2));
977      }
978
979      OperationStatus[] codes = region.batchMutate(mArray, atomic, nonceGroup, nonce);
980
981      // When atomic is true, it indicates that the mutateRow API or the batch API with
982      // RowMutations is called. In this case, we need to merge the results of the
983      // Increment/Append operations if the mutations include those operations, and set the merged
984      // result to the first element of the ResultOrException list
985      if (atomic) {
986        List<ResultOrException> resultOrExceptions = new ArrayList<>();
987        List<Result> results = new ArrayList<>();
988        for (i = 0; i < codes.length; i++) {
989          if (codes[i].getResult() != null) {
990            results.add(codes[i].getResult());
991          }
992          if (i != 0) {
993            resultOrExceptions
994              .add(getResultOrException(ClientProtos.Result.getDefaultInstance(), i));
995          }
996        }
997
998        if (results.isEmpty()) {
999          builder.addResultOrException(
1000            getResultOrException(ClientProtos.Result.getDefaultInstance(), 0));
1001        } else {
1002          // Merge the results of the Increment/Append operations
1003          List<Cell> cellList = new ArrayList<>();
1004          for (Result result : results) {
1005            if (result.rawCells() != null) {
1006              cellList.addAll(Arrays.asList(result.rawCells()));
1007            }
1008          }
1009          Result result = Result.create(cellList);
1010
1011          // Set the merged result of the Increment/Append operations to the first element of the
1012          // ResultOrException list
1013          builder.addResultOrException(getResultOrException(ProtobufUtil.toResult(result), 0));
1014        }
1015
1016        builder.addAllResultOrException(resultOrExceptions);
1017        return;
1018      }
1019
1020      for (i = 0; i < codes.length; i++) {
1021        Mutation currentMutation = mArray[i];
1022        ClientProtos.Action currentAction = mutationActionMap.get(currentMutation);
1023        int index = currentAction.hasIndex() ? currentAction.getIndex() : i;
1024        Exception e;
1025        switch (codes[i].getOperationStatusCode()) {
1026          case BAD_FAMILY:
1027            e = new NoSuchColumnFamilyException(codes[i].getExceptionMsg());
1028            builder.addResultOrException(getResultOrException(e, index));
1029            break;
1030
1031          case SANITY_CHECK_FAILURE:
1032            e = new FailedSanityCheckException(codes[i].getExceptionMsg());
1033            builder.addResultOrException(getResultOrException(e, index));
1034            break;
1035
1036          default:
1037            e = new DoNotRetryIOException(codes[i].getExceptionMsg());
1038            builder.addResultOrException(getResultOrException(e, index));
1039            break;
1040
1041          case SUCCESS:
1042            builder.addResultOrException(
1043              getResultOrException(ClientProtos.Result.getDefaultInstance(), index));
1044            break;
1045
1046          case STORE_TOO_BUSY:
1047            e = new RegionTooBusyException(codes[i].getExceptionMsg());
1048            builder.addResultOrException(getResultOrException(e, index));
1049            break;
1050        }
1051      }
1052    } finally {
1053      int processedMutationIndex = 0;
1054      for (Action mutation : mutations) {
1055        // The non-null mArray[i] means the cell scanner has been read.
1056        if (mArray[processedMutationIndex++] == null) {
1057          skipCellsForMutation(mutation, cells);
1058        }
1059      }
1060      updateMutationMetrics(region, before, batchContainsPuts, batchContainsDelete);
1061    }
1062  }
1063
1064  private void updateMutationMetrics(HRegion region, long starttime, boolean batchContainsPuts,
1065    boolean batchContainsDelete) {
1066    final MetricsRegionServer metricsRegionServer = server.getMetrics();
1067    if (metricsRegionServer != null) {
1068      long after = EnvironmentEdgeManager.currentTime();
1069      if (batchContainsPuts) {
1070        metricsRegionServer.updatePutBatch(region.getTableDescriptor().getTableName(),
1071          after - starttime);
1072      }
1073      if (batchContainsDelete) {
1074        metricsRegionServer.updateDeleteBatch(region.getTableDescriptor().getTableName(),
1075          after - starttime);
1076      }
1077    }
1078  }
1079
1080  /**
1081   * Execute a list of Put/Delete mutations. The function returns OperationStatus instead of
1082   * constructing MultiResponse to save a possible loop if caller doesn't need MultiResponse.
1083   * @return an array of OperationStatus which internally contains the OperationStatusCode and the
1084   *         exceptionMessage if any
1085   * @deprecated Since 3.0.0, will be removed in 4.0.0. We do not use this method for replaying
1086   *             edits for secondary replicas any more, see
1087   *             {@link #replicateToReplica(RpcController, ReplicateWALEntryRequest)}.
1088   */
1089  @Deprecated
1090  private OperationStatus[] doReplayBatchOp(final HRegion region,
1091    final List<MutationReplay> mutations, long replaySeqId) throws IOException {
1092    long before = EnvironmentEdgeManager.currentTime();
1093    boolean batchContainsPuts = false, batchContainsDelete = false;
1094    try {
1095      for (Iterator<MutationReplay> it = mutations.iterator(); it.hasNext();) {
1096        MutationReplay m = it.next();
1097
1098        if (m.getType() == MutationType.PUT) {
1099          batchContainsPuts = true;
1100        } else {
1101          batchContainsDelete = true;
1102        }
1103
1104        NavigableMap<byte[], List<Cell>> map = m.mutation.getFamilyCellMap();
1105        List<Cell> metaCells = map.get(WALEdit.METAFAMILY);
1106        if (metaCells != null && !metaCells.isEmpty()) {
1107          for (Cell metaCell : metaCells) {
1108            CompactionDescriptor compactionDesc = WALEdit.getCompaction(metaCell);
1109            boolean isDefaultReplica = RegionReplicaUtil.isDefaultReplica(region.getRegionInfo());
1110            HRegion hRegion = region;
1111            if (compactionDesc != null) {
1112              // replay the compaction. Remove the files from stores only if we are the primary
1113              // region replica (thus own the files)
1114              hRegion.replayWALCompactionMarker(compactionDesc, !isDefaultReplica, isDefaultReplica,
1115                replaySeqId);
1116              continue;
1117            }
1118            FlushDescriptor flushDesc = WALEdit.getFlushDescriptor(metaCell);
1119            if (flushDesc != null && !isDefaultReplica) {
1120              hRegion.replayWALFlushMarker(flushDesc, replaySeqId);
1121              continue;
1122            }
1123            RegionEventDescriptor regionEvent = WALEdit.getRegionEventDescriptor(metaCell);
1124            if (regionEvent != null && !isDefaultReplica) {
1125              hRegion.replayWALRegionEventMarker(regionEvent);
1126              continue;
1127            }
1128            BulkLoadDescriptor bulkLoadEvent = WALEdit.getBulkLoadDescriptor(metaCell);
1129            if (bulkLoadEvent != null) {
1130              hRegion.replayWALBulkLoadEventMarker(bulkLoadEvent);
1131              continue;
1132            }
1133          }
1134          it.remove();
1135        }
1136      }
1137      requestCount.increment();
1138      if (!region.getRegionInfo().isMetaRegion()) {
1139        server.getMemStoreFlusher().reclaimMemStoreMemory();
1140      }
1141      return region.batchReplay(mutations.toArray(new MutationReplay[mutations.size()]),
1142        replaySeqId);
1143    } finally {
1144      updateMutationMetrics(region, before, batchContainsPuts, batchContainsDelete);
1145    }
1146  }
1147
1148  private void closeAllScanners() {
1149    // Close any outstanding scanners. Means they'll get an UnknownScanner
1150    // exception next time they come in.
1151    for (Map.Entry<String, RegionScannerHolder> e : scanners.entrySet()) {
1152      try {
1153        e.getValue().s.close();
1154      } catch (IOException ioe) {
1155        LOG.warn("Closing scanner " + e.getKey(), ioe);
1156      }
1157    }
1158  }
1159
1160  // Directly invoked only for testing
1161  public RSRpcServices(final HRegionServer rs) throws IOException {
1162    super(rs, rs.getProcessName());
1163    final Configuration conf = rs.getConfiguration();
1164    rowSizeWarnThreshold =
1165      conf.getInt(HConstants.BATCH_ROWS_THRESHOLD_NAME, HConstants.BATCH_ROWS_THRESHOLD_DEFAULT);
1166    rejectRowsWithSizeOverThreshold =
1167      conf.getBoolean(REJECT_BATCH_ROWS_OVER_THRESHOLD, DEFAULT_REJECT_BATCH_ROWS_OVER_THRESHOLD);
1168    scannerLeaseTimeoutPeriod = conf.getInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD,
1169      HConstants.DEFAULT_HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD);
1170    maxScannerResultSize = conf.getLong(HConstants.HBASE_SERVER_SCANNER_MAX_RESULT_SIZE_KEY,
1171      HConstants.DEFAULT_HBASE_SERVER_SCANNER_MAX_RESULT_SIZE);
1172    rpcTimeout =
1173      conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY, HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
1174    minimumScanTimeLimitDelta = conf.getLong(REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA,
1175      DEFAULT_REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA);
1176    rpcServer.setNamedQueueRecorder(rs.getNamedQueueRecorder());
1177    closedScanners = CacheBuilder.newBuilder()
1178      .expireAfterAccess(scannerLeaseTimeoutPeriod, TimeUnit.MILLISECONDS).build();
1179  }
1180
1181  @Override
1182  protected boolean defaultReservoirEnabled() {
1183    return true;
1184  }
1185
1186  @Override
1187  protected ServerType getDNSServerType() {
1188    return DNS.ServerType.REGIONSERVER;
1189  }
1190
1191  @Override
1192  protected String getHostname(Configuration conf, String defaultHostname) {
1193    return conf.get("hbase.regionserver.ipc.address", defaultHostname);
1194  }
1195
1196  @Override
1197  protected String getPortConfigName() {
1198    return HConstants.REGIONSERVER_PORT;
1199  }
1200
1201  @Override
1202  protected int getDefaultPort() {
1203    return HConstants.DEFAULT_REGIONSERVER_PORT;
1204  }
1205
1206  @Override
1207  protected Class<?> getRpcSchedulerFactoryClass(Configuration conf) {
1208    return conf.getClass(REGION_SERVER_RPC_SCHEDULER_FACTORY_CLASS,
1209      SimpleRpcSchedulerFactory.class);
1210  }
1211
1212  protected RpcServerInterface createRpcServer(final Server server,
1213    final RpcSchedulerFactory rpcSchedulerFactory, final InetSocketAddress bindAddress,
1214    final String name) throws IOException {
1215    final Configuration conf = server.getConfiguration();
1216    boolean reservoirEnabled = conf.getBoolean(ByteBuffAllocator.ALLOCATOR_POOL_ENABLED_KEY, true);
1217    try {
1218      return RpcServerFactory.createRpcServer(server, name, getServices(), bindAddress, // use final
1219                                                                                        // bindAddress
1220                                                                                        // for this
1221                                                                                        // server.
1222        conf, rpcSchedulerFactory.create(conf, this, server), reservoirEnabled);
1223    } catch (BindException be) {
1224      throw new IOException(be.getMessage() + ". To switch ports use the '"
1225        + HConstants.REGIONSERVER_PORT + "' configuration property.",
1226        be.getCause() != null ? be.getCause() : be);
1227    }
1228  }
1229
1230  protected Class<?> getRpcSchedulerFactoryClass() {
1231    final Configuration conf = server.getConfiguration();
1232    return conf.getClass(REGION_SERVER_RPC_SCHEDULER_FACTORY_CLASS,
1233      SimpleRpcSchedulerFactory.class);
1234  }
1235
1236  protected PriorityFunction createPriority() {
1237    return new RSAnnotationReadingPriorityFunction(this);
1238  }
1239
1240  public int getScannersCount() {
1241    return scanners.size();
1242  }
1243
1244  /** Returns The outstanding RegionScanner for <code>scannerId</code> or null if none found. */
1245  RegionScanner getScanner(long scannerId) {
1246    RegionScannerHolder rsh = getRegionScannerHolder(scannerId);
1247    return rsh == null ? null : rsh.s;
1248  }
1249
1250  /** Returns The associated RegionScannerHolder for <code>scannerId</code> or null. */
1251  private RegionScannerHolder getRegionScannerHolder(long scannerId) {
1252    return scanners.get(toScannerName(scannerId));
1253  }
1254
1255  public String getScanDetailsWithId(long scannerId) {
1256    RegionScanner scanner = getScanner(scannerId);
1257    if (scanner == null) {
1258      return null;
1259    }
1260    StringBuilder builder = new StringBuilder();
1261    builder.append("table: ").append(scanner.getRegionInfo().getTable().getNameAsString());
1262    builder.append(" region: ").append(scanner.getRegionInfo().getRegionNameAsString());
1263    builder.append(" operation_id: ").append(scanner.getOperationId());
1264    return builder.toString();
1265  }
1266
1267  public String getScanDetailsWithRequest(ScanRequest request) {
1268    try {
1269      if (!request.hasRegion()) {
1270        return null;
1271      }
1272      Region region = getRegion(request.getRegion());
1273      StringBuilder builder = new StringBuilder();
1274      builder.append("table: ").append(region.getRegionInfo().getTable().getNameAsString());
1275      builder.append(" region: ").append(region.getRegionInfo().getRegionNameAsString());
1276      for (NameBytesPair pair : request.getScan().getAttributeList()) {
1277        if (OperationWithAttributes.ID_ATRIBUTE.equals(pair.getName())) {
1278          builder.append(" operation_id: ").append(Bytes.toString(pair.getValue().toByteArray()));
1279          break;
1280        }
1281      }
1282      return builder.toString();
1283    } catch (IOException ignored) {
1284      return null;
1285    }
1286  }
1287
1288  /**
1289   * Get the vtime associated with the scanner. Currently the vtime is the number of "next" calls.
1290   */
1291  long getScannerVirtualTime(long scannerId) {
1292    RegionScannerHolder rsh = getRegionScannerHolder(scannerId);
1293    return rsh == null ? 0L : rsh.getNextCallSeq();
1294  }
1295
1296  /**
1297   * Method to account for the size of retained cells and retained data blocks.
1298   * @param context   rpc call context
1299   * @param r         result to add size.
1300   * @param lastBlock last block to check whether we need to add the block size in context.
1301   * @return an object that represents the last referenced block from this response.
1302   */
1303  Object addSize(RpcCallContext context, Result r, Object lastBlock) {
1304    if (context != null && r != null && !r.isEmpty()) {
1305      for (Cell c : r.rawCells()) {
1306        context.incrementResponseCellSize(PrivateCellUtil.estimatedSerializedSizeOf(c));
1307
1308        // Since byte buffers can point all kinds of crazy places it's harder to keep track
1309        // of which blocks are kept alive by what byte buffer.
1310        // So we make a guess.
1311        if (c instanceof ByteBufferExtendedCell) {
1312          ByteBufferExtendedCell bbCell = (ByteBufferExtendedCell) c;
1313          ByteBuffer bb = bbCell.getValueByteBuffer();
1314          if (bb != lastBlock) {
1315            context.incrementResponseBlockSize(bb.capacity());
1316            lastBlock = bb;
1317          }
1318        } else {
1319          // We're using the last block being the same as the current block as
1320          // a proxy for pointing to a new block. This won't be exact.
1321          // If there are multiple gets that bounce back and forth
1322          // Then it's possible that this will over count the size of
1323          // referenced blocks. However it's better to over count and
1324          // use two rpcs than to OOME the regionserver.
1325          byte[] valueArray = c.getValueArray();
1326          if (valueArray != lastBlock) {
1327            context.incrementResponseBlockSize(valueArray.length);
1328            lastBlock = valueArray;
1329          }
1330        }
1331
1332      }
1333    }
1334    return lastBlock;
1335  }
1336
1337  /** Returns Remote client's ip and port else null if can't be determined. */
1338  @RestrictedApi(explanation = "Should only be called in TestRSRpcServices and RSRpcServices",
1339      link = "", allowedOnPath = ".*(TestRSRpcServices|RSRpcServices).java")
1340  static String getRemoteClientIpAndPort() {
1341    RpcCall rpcCall = RpcServer.getCurrentCall().orElse(null);
1342    if (rpcCall == null) {
1343      return HConstants.EMPTY_STRING;
1344    }
1345    InetAddress address = rpcCall.getRemoteAddress();
1346    if (address == null) {
1347      return HConstants.EMPTY_STRING;
1348    }
1349    // Be careful here with InetAddress. Do InetAddress#getHostAddress. It will not do a name
1350    // resolution. Just use the IP. It is generally a smaller amount of info to keep around while
1351    // scanning than a hostname anyways.
1352    return Address.fromParts(address.getHostAddress(), rpcCall.getRemotePort()).toString();
1353  }
1354
1355  /** Returns Remote client's username. */
1356  @RestrictedApi(explanation = "Should only be called in TestRSRpcServices and RSRpcServices",
1357      link = "", allowedOnPath = ".*(TestRSRpcServices|RSRpcServices).java")
1358  static String getUserName() {
1359    RpcCall rpcCall = RpcServer.getCurrentCall().orElse(null);
1360    if (rpcCall == null) {
1361      return HConstants.EMPTY_STRING;
1362    }
1363    return rpcCall.getRequestUserName().orElse(HConstants.EMPTY_STRING);
1364  }
1365
1366  private RegionScannerHolder addScanner(String scannerName, RegionScanner s, Shipper shipper,
1367    HRegion r, boolean needCursor, boolean fullRegionScan) throws LeaseStillHeldException {
1368    Lease lease = server.getLeaseManager().createLease(scannerName, this.scannerLeaseTimeoutPeriod,
1369      new ScannerListener(scannerName));
1370    RpcCallback shippedCallback = new RegionScannerShippedCallBack(scannerName, shipper, lease);
1371    RpcCallback closeCallback =
1372      s instanceof RpcCallback ? (RpcCallback) s : new RegionScannerCloseCallBack(s);
1373    RegionScannerHolder rsh = new RegionScannerHolder(s, r, closeCallback, shippedCallback,
1374      needCursor, fullRegionScan, getRemoteClientIpAndPort(), getUserName());
1375    RegionScannerHolder existing = scanners.putIfAbsent(scannerName, rsh);
1376    assert existing == null : "scannerId must be unique within regionserver's whole lifecycle! "
1377      + scannerName + ", " + existing;
1378    return rsh;
1379  }
1380
1381  private boolean isFullRegionScan(Scan scan, HRegion region) {
1382    // If the scan start row equals or less than the start key of the region
1383    // and stop row greater than equals end key (if stop row present)
1384    // or if the stop row is empty
1385    // account this as a full region scan
1386    if (
1387      Bytes.compareTo(scan.getStartRow(), region.getRegionInfo().getStartKey()) <= 0
1388        && (Bytes.compareTo(scan.getStopRow(), region.getRegionInfo().getEndKey()) >= 0
1389          && !Bytes.equals(region.getRegionInfo().getEndKey(), HConstants.EMPTY_END_ROW)
1390          || Bytes.equals(scan.getStopRow(), HConstants.EMPTY_END_ROW))
1391    ) {
1392      return true;
1393    }
1394    return false;
1395  }
1396
1397  /**
1398   * Find the HRegion based on a region specifier
1399   * @param regionSpecifier the region specifier
1400   * @return the corresponding region
1401   * @throws IOException if the specifier is not null, but failed to find the region
1402   */
1403  public HRegion getRegion(final RegionSpecifier regionSpecifier) throws IOException {
1404    return server.getRegion(regionSpecifier.getValue().toByteArray());
1405  }
1406
1407  /**
1408   * Find the List of HRegions based on a list of region specifiers
1409   * @param regionSpecifiers the list of region specifiers
1410   * @return the corresponding list of regions
1411   * @throws IOException if any of the specifiers is not null, but failed to find the region
1412   */
1413  private List<HRegion> getRegions(final List<RegionSpecifier> regionSpecifiers,
1414    final CacheEvictionStatsBuilder stats) {
1415    List<HRegion> regions = Lists.newArrayListWithCapacity(regionSpecifiers.size());
1416    for (RegionSpecifier regionSpecifier : regionSpecifiers) {
1417      try {
1418        regions.add(server.getRegion(regionSpecifier.getValue().toByteArray()));
1419      } catch (NotServingRegionException e) {
1420        stats.addException(regionSpecifier.getValue().toByteArray(), e);
1421      }
1422    }
1423    return regions;
1424  }
1425
1426  PriorityFunction getPriority() {
1427    return priority;
1428  }
1429
1430  private RegionServerRpcQuotaManager getRpcQuotaManager() {
1431    return server.getRegionServerRpcQuotaManager();
1432  }
1433
1434  private RegionServerSpaceQuotaManager getSpaceQuotaManager() {
1435    return server.getRegionServerSpaceQuotaManager();
1436  }
1437
1438  void start(ZKWatcher zkWatcher) {
1439    this.scannerIdGenerator = new ScannerIdGenerator(this.server.getServerName());
1440    internalStart(zkWatcher);
1441  }
1442
1443  void stop() {
1444    closeAllScanners();
1445    internalStop();
1446  }
1447
1448  /**
1449   * Called to verify that this server is up and running.
1450   */
1451  // TODO : Rename this and HMaster#checkInitialized to isRunning() (or a better name).
1452  protected void checkOpen() throws IOException {
1453    if (server.isAborted()) {
1454      throw new RegionServerAbortedException("Server " + server.getServerName() + " aborting");
1455    }
1456    if (server.isStopped()) {
1457      throw new RegionServerStoppedException("Server " + server.getServerName() + " stopping");
1458    }
1459    if (!server.isDataFileSystemOk()) {
1460      throw new RegionServerStoppedException("File system not available");
1461    }
1462    if (!server.isOnline()) {
1463      throw new ServerNotRunningYetException(
1464        "Server " + server.getServerName() + " is not running yet");
1465    }
1466  }
1467
1468  /**
1469   * By default, put up an Admin and a Client Service. Set booleans
1470   * <code>hbase.regionserver.admin.executorService</code> and
1471   * <code>hbase.regionserver.client.executorService</code> if you want to enable/disable services.
1472   * Default is that both are enabled.
1473   * @return immutable list of blocking services and the security info classes that this server
1474   *         supports
1475   */
1476  protected List<BlockingServiceAndInterface> getServices() {
1477    boolean admin = getConfiguration().getBoolean(REGIONSERVER_ADMIN_SERVICE_CONFIG, true);
1478    boolean client = getConfiguration().getBoolean(REGIONSERVER_CLIENT_SERVICE_CONFIG, true);
1479    boolean clientMeta =
1480      getConfiguration().getBoolean(REGIONSERVER_CLIENT_META_SERVICE_CONFIG, true);
1481    List<BlockingServiceAndInterface> bssi = new ArrayList<>();
1482    if (client) {
1483      bssi.add(new BlockingServiceAndInterface(ClientService.newReflectiveBlockingService(this),
1484        ClientService.BlockingInterface.class));
1485    }
1486    if (admin) {
1487      bssi.add(new BlockingServiceAndInterface(AdminService.newReflectiveBlockingService(this),
1488        AdminService.BlockingInterface.class));
1489    }
1490    if (clientMeta) {
1491      bssi.add(new BlockingServiceAndInterface(ClientMetaService.newReflectiveBlockingService(this),
1492        ClientMetaService.BlockingInterface.class));
1493    }
1494    return new ImmutableList.Builder<BlockingServiceAndInterface>().addAll(bssi).build();
1495  }
1496
1497  /**
1498   * Close a region on the region server.
1499   * @param controller the RPC controller
1500   * @param request    the request n
1501   */
1502  @Override
1503  @QosPriority(priority = HConstants.ADMIN_QOS)
1504  public CloseRegionResponse closeRegion(final RpcController controller,
1505    final CloseRegionRequest request) throws ServiceException {
1506    final ServerName sn = (request.hasDestinationServer()
1507      ? ProtobufUtil.toServerName(request.getDestinationServer())
1508      : null);
1509
1510    try {
1511      checkOpen();
1512      throwOnWrongStartCode(request);
1513      final String encodedRegionName = ProtobufUtil.getRegionEncodedName(request.getRegion());
1514
1515      requestCount.increment();
1516      if (sn == null) {
1517        LOG.info("Close " + encodedRegionName + " without moving");
1518      } else {
1519        LOG.info("Close " + encodedRegionName + ", moving to " + sn);
1520      }
1521      boolean closed = server.closeRegion(encodedRegionName, false, sn);
1522      CloseRegionResponse.Builder builder = CloseRegionResponse.newBuilder().setClosed(closed);
1523      return builder.build();
1524    } catch (IOException ie) {
1525      throw new ServiceException(ie);
1526    }
1527  }
1528
1529  /**
1530   * Compact a region on the region server.
1531   * @param controller the RPC controller
1532   * @param request    the request n
1533   */
1534  @Override
1535  @QosPriority(priority = HConstants.ADMIN_QOS)
1536  public CompactRegionResponse compactRegion(final RpcController controller,
1537    final CompactRegionRequest request) throws ServiceException {
1538    try {
1539      checkOpen();
1540      requestCount.increment();
1541      HRegion region = getRegion(request.getRegion());
1542      // Quota support is enabled, the requesting user is not system/super user
1543      // and a quota policy is enforced that disables compactions.
1544      if (
1545        QuotaUtil.isQuotaEnabled(getConfiguration())
1546          && !Superusers.isSuperUser(RpcServer.getRequestUser().orElse(null))
1547          && this.server.getRegionServerSpaceQuotaManager()
1548            .areCompactionsDisabled(region.getTableDescriptor().getTableName())
1549      ) {
1550        throw new DoNotRetryIOException(
1551          "Compactions on this region are " + "disabled due to a space quota violation.");
1552      }
1553      region.startRegionOperation(Operation.COMPACT_REGION);
1554      LOG.info("Compacting " + region.getRegionInfo().getRegionNameAsString());
1555      boolean major = request.hasMajor() && request.getMajor();
1556      if (request.hasFamily()) {
1557        byte[] family = request.getFamily().toByteArray();
1558        String log = "User-triggered " + (major ? "major " : "") + "compaction for region "
1559          + region.getRegionInfo().getRegionNameAsString() + " and family "
1560          + Bytes.toString(family);
1561        LOG.trace(log);
1562        region.requestCompaction(family, log, Store.PRIORITY_USER, major,
1563          CompactionLifeCycleTracker.DUMMY);
1564      } else {
1565        String log = "User-triggered " + (major ? "major " : "") + "compaction for region "
1566          + region.getRegionInfo().getRegionNameAsString();
1567        LOG.trace(log);
1568        region.requestCompaction(log, Store.PRIORITY_USER, major, CompactionLifeCycleTracker.DUMMY);
1569      }
1570      return CompactRegionResponse.newBuilder().build();
1571    } catch (IOException ie) {
1572      throw new ServiceException(ie);
1573    }
1574  }
1575
1576  @Override
1577  public CompactionSwitchResponse compactionSwitch(RpcController controller,
1578    CompactionSwitchRequest request) throws ServiceException {
1579    rpcPreCheck("compactionSwitch");
1580    final CompactSplit compactSplitThread = server.getCompactSplitThread();
1581    requestCount.increment();
1582    boolean prevState = compactSplitThread.isCompactionsEnabled();
1583    CompactionSwitchResponse response =
1584      CompactionSwitchResponse.newBuilder().setPrevState(prevState).build();
1585    if (prevState == request.getEnabled()) {
1586      // passed in requested state is same as current state. No action required
1587      return response;
1588    }
1589    compactSplitThread.switchCompaction(request.getEnabled());
1590    return response;
1591  }
1592
1593  /**
1594   * Flush a region on the region server.
1595   * @param controller the RPC controller
1596   * @param request    the request n
1597   */
1598  @Override
1599  @QosPriority(priority = HConstants.ADMIN_QOS)
1600  public FlushRegionResponse flushRegion(final RpcController controller,
1601    final FlushRegionRequest request) throws ServiceException {
1602    try {
1603      checkOpen();
1604      requestCount.increment();
1605      HRegion region = getRegion(request.getRegion());
1606      LOG.info("Flushing " + region.getRegionInfo().getRegionNameAsString());
1607      boolean shouldFlush = true;
1608      if (request.hasIfOlderThanTs()) {
1609        shouldFlush = region.getEarliestFlushTimeForAllStores() < request.getIfOlderThanTs();
1610      }
1611      FlushRegionResponse.Builder builder = FlushRegionResponse.newBuilder();
1612      if (shouldFlush) {
1613        boolean writeFlushWalMarker =
1614          request.hasWriteFlushWalMarker() ? request.getWriteFlushWalMarker() : false;
1615        // Go behind the curtain so we can manage writing of the flush WAL marker
1616        HRegion.FlushResultImpl flushResult = null;
1617        if (request.hasFamily()) {
1618          List families = new ArrayList();
1619          families.add(request.getFamily().toByteArray());
1620          flushResult =
1621            region.flushcache(families, writeFlushWalMarker, FlushLifeCycleTracker.DUMMY);
1622        } else {
1623          flushResult = region.flushcache(true, writeFlushWalMarker, FlushLifeCycleTracker.DUMMY);
1624        }
1625        boolean compactionNeeded = flushResult.isCompactionNeeded();
1626        if (compactionNeeded) {
1627          server.getCompactSplitThread().requestSystemCompaction(region,
1628            "Compaction through user triggered flush");
1629        }
1630        builder.setFlushed(flushResult.isFlushSucceeded());
1631        builder.setWroteFlushWalMarker(flushResult.wroteFlushWalMarker);
1632      }
1633      builder.setLastFlushTime(region.getEarliestFlushTimeForAllStores());
1634      return builder.build();
1635    } catch (DroppedSnapshotException ex) {
1636      // Cache flush can fail in a few places. If it fails in a critical
1637      // section, we get a DroppedSnapshotException and a replay of wal
1638      // is required. Currently the only way to do this is a restart of
1639      // the server.
1640      server.abort("Replay of WAL required. Forcing server shutdown", ex);
1641      throw new ServiceException(ex);
1642    } catch (IOException ie) {
1643      throw new ServiceException(ie);
1644    }
1645  }
1646
1647  @Override
1648  @QosPriority(priority = HConstants.ADMIN_QOS)
1649  public GetOnlineRegionResponse getOnlineRegion(final RpcController controller,
1650    final GetOnlineRegionRequest request) throws ServiceException {
1651    try {
1652      checkOpen();
1653      requestCount.increment();
1654      Map<String, HRegion> onlineRegions = server.getOnlineRegions();
1655      List<RegionInfo> list = new ArrayList<>(onlineRegions.size());
1656      for (HRegion region : onlineRegions.values()) {
1657        list.add(region.getRegionInfo());
1658      }
1659      list.sort(RegionInfo.COMPARATOR);
1660      return ResponseConverter.buildGetOnlineRegionResponse(list);
1661    } catch (IOException ie) {
1662      throw new ServiceException(ie);
1663    }
1664  }
1665
1666  // Master implementation of this Admin Service differs given it is not
1667  // able to supply detail only known to RegionServer. See note on
1668  // MasterRpcServers#getRegionInfo.
1669  @Override
1670  @QosPriority(priority = HConstants.ADMIN_QOS)
1671  public GetRegionInfoResponse getRegionInfo(final RpcController controller,
1672    final GetRegionInfoRequest request) throws ServiceException {
1673    try {
1674      checkOpen();
1675      requestCount.increment();
1676      HRegion region = getRegion(request.getRegion());
1677      RegionInfo info = region.getRegionInfo();
1678      byte[] bestSplitRow;
1679      if (request.hasBestSplitRow() && request.getBestSplitRow()) {
1680        bestSplitRow = region.checkSplit(true).orElse(null);
1681        // when all table data are in memstore, bestSplitRow = null
1682        // try to flush region first
1683        if (bestSplitRow == null) {
1684          region.flush(true);
1685          bestSplitRow = region.checkSplit(true).orElse(null);
1686        }
1687      } else {
1688        bestSplitRow = null;
1689      }
1690      GetRegionInfoResponse.Builder builder = GetRegionInfoResponse.newBuilder();
1691      builder.setRegionInfo(ProtobufUtil.toRegionInfo(info));
1692      if (request.hasCompactionState() && request.getCompactionState()) {
1693        builder.setCompactionState(ProtobufUtil.createCompactionState(region.getCompactionState()));
1694      }
1695      builder.setSplittable(region.isSplittable());
1696      builder.setMergeable(region.isMergeable());
1697      if (request.hasBestSplitRow() && request.getBestSplitRow() && bestSplitRow != null) {
1698        builder.setBestSplitRow(UnsafeByteOperations.unsafeWrap(bestSplitRow));
1699      }
1700      return builder.build();
1701    } catch (IOException ie) {
1702      throw new ServiceException(ie);
1703    }
1704  }
1705
1706  @Override
1707  @QosPriority(priority = HConstants.ADMIN_QOS)
1708  public GetRegionLoadResponse getRegionLoad(RpcController controller, GetRegionLoadRequest request)
1709    throws ServiceException {
1710
1711    List<HRegion> regions;
1712    if (request.hasTableName()) {
1713      TableName tableName = ProtobufUtil.toTableName(request.getTableName());
1714      regions = server.getRegions(tableName);
1715    } else {
1716      regions = server.getRegions();
1717    }
1718    List<RegionLoad> rLoads = new ArrayList<>(regions.size());
1719    RegionLoad.Builder regionLoadBuilder = ClusterStatusProtos.RegionLoad.newBuilder();
1720    RegionSpecifier.Builder regionSpecifier = RegionSpecifier.newBuilder();
1721
1722    try {
1723      for (HRegion region : regions) {
1724        rLoads.add(server.createRegionLoad(region, regionLoadBuilder, regionSpecifier));
1725      }
1726    } catch (IOException e) {
1727      throw new ServiceException(e);
1728    }
1729    GetRegionLoadResponse.Builder builder = GetRegionLoadResponse.newBuilder();
1730    builder.addAllRegionLoads(rLoads);
1731    return builder.build();
1732  }
1733
1734  @Override
1735  @QosPriority(priority = HConstants.ADMIN_QOS)
1736  public ClearCompactionQueuesResponse clearCompactionQueues(RpcController controller,
1737    ClearCompactionQueuesRequest request) throws ServiceException {
1738    LOG.debug("Client=" + RpcServer.getRequestUserName().orElse(null) + "/"
1739      + RpcServer.getRemoteAddress().orElse(null) + " clear compactions queue");
1740    ClearCompactionQueuesResponse.Builder respBuilder = ClearCompactionQueuesResponse.newBuilder();
1741    requestCount.increment();
1742    if (clearCompactionQueues.compareAndSet(false, true)) {
1743      final CompactSplit compactSplitThread = server.getCompactSplitThread();
1744      try {
1745        checkOpen();
1746        server.getRegionServerCoprocessorHost().preClearCompactionQueues();
1747        for (String queueName : request.getQueueNameList()) {
1748          LOG.debug("clear " + queueName + " compaction queue");
1749          switch (queueName) {
1750            case "long":
1751              compactSplitThread.clearLongCompactionsQueue();
1752              break;
1753            case "short":
1754              compactSplitThread.clearShortCompactionsQueue();
1755              break;
1756            default:
1757              LOG.warn("Unknown queue name " + queueName);
1758              throw new IOException("Unknown queue name " + queueName);
1759          }
1760        }
1761        server.getRegionServerCoprocessorHost().postClearCompactionQueues();
1762      } catch (IOException ie) {
1763        throw new ServiceException(ie);
1764      } finally {
1765        clearCompactionQueues.set(false);
1766      }
1767    } else {
1768      LOG.warn("Clear compactions queue is executing by other admin.");
1769    }
1770    return respBuilder.build();
1771  }
1772
1773  /**
1774   * Get some information of the region server.
1775   * @param controller the RPC controller
1776   * @param request    the request n
1777   */
1778  @Override
1779  @QosPriority(priority = HConstants.ADMIN_QOS)
1780  public GetServerInfoResponse getServerInfo(final RpcController controller,
1781    final GetServerInfoRequest request) throws ServiceException {
1782    try {
1783      checkOpen();
1784    } catch (IOException ie) {
1785      throw new ServiceException(ie);
1786    }
1787    requestCount.increment();
1788    int infoPort = server.getInfoServer() != null ? server.getInfoServer().getPort() : -1;
1789    return ResponseConverter.buildGetServerInfoResponse(server.getServerName(), infoPort);
1790  }
1791
1792  @Override
1793  @QosPriority(priority = HConstants.ADMIN_QOS)
1794  public GetStoreFileResponse getStoreFile(final RpcController controller,
1795    final GetStoreFileRequest request) throws ServiceException {
1796    try {
1797      checkOpen();
1798      HRegion region = getRegion(request.getRegion());
1799      requestCount.increment();
1800      Set<byte[]> columnFamilies;
1801      if (request.getFamilyCount() == 0) {
1802        columnFamilies = region.getTableDescriptor().getColumnFamilyNames();
1803      } else {
1804        columnFamilies = new TreeSet<>(Bytes.BYTES_RAWCOMPARATOR);
1805        for (ByteString cf : request.getFamilyList()) {
1806          columnFamilies.add(cf.toByteArray());
1807        }
1808      }
1809      int nCF = columnFamilies.size();
1810      List<String> fileList = region.getStoreFileList(columnFamilies.toArray(new byte[nCF][]));
1811      GetStoreFileResponse.Builder builder = GetStoreFileResponse.newBuilder();
1812      builder.addAllStoreFile(fileList);
1813      return builder.build();
1814    } catch (IOException ie) {
1815      throw new ServiceException(ie);
1816    }
1817  }
1818
1819  private void throwOnWrongStartCode(OpenRegionRequest request) throws ServiceException {
1820    if (!request.hasServerStartCode()) {
1821      LOG.warn("OpenRegionRequest for {} does not have a start code", request.getOpenInfoList());
1822      return;
1823    }
1824    throwOnWrongStartCode(request.getServerStartCode());
1825  }
1826
1827  private void throwOnWrongStartCode(CloseRegionRequest request) throws ServiceException {
1828    if (!request.hasServerStartCode()) {
1829      LOG.warn("CloseRegionRequest for {} does not have a start code", request.getRegion());
1830      return;
1831    }
1832    throwOnWrongStartCode(request.getServerStartCode());
1833  }
1834
1835  private void throwOnWrongStartCode(long serverStartCode) throws ServiceException {
1836    // check that we are the same server that this RPC is intended for.
1837    if (server.getServerName().getStartcode() != serverStartCode) {
1838      throw new ServiceException(new DoNotRetryIOException(
1839        "This RPC was intended for a " + "different server with startCode: " + serverStartCode
1840          + ", this server is: " + server.getServerName()));
1841    }
1842  }
1843
1844  private void throwOnWrongStartCode(ExecuteProceduresRequest req) throws ServiceException {
1845    if (req.getOpenRegionCount() > 0) {
1846      for (OpenRegionRequest openReq : req.getOpenRegionList()) {
1847        throwOnWrongStartCode(openReq);
1848      }
1849    }
1850    if (req.getCloseRegionCount() > 0) {
1851      for (CloseRegionRequest closeReq : req.getCloseRegionList()) {
1852        throwOnWrongStartCode(closeReq);
1853      }
1854    }
1855  }
1856
1857  /**
1858   * Open asynchronously a region or a set of regions on the region server. The opening is
1859   * coordinated by ZooKeeper, and this method requires the znode to be created before being called.
1860   * As a consequence, this method should be called only from the master.
1861   * <p>
1862   * Different manages states for the region are:
1863   * </p>
1864   * <ul>
1865   * <li>region not opened: the region opening will start asynchronously.</li>
1866   * <li>a close is already in progress: this is considered as an error.</li>
1867   * <li>an open is already in progress: this new open request will be ignored. This is important
1868   * because the Master can do multiple requests if it crashes.</li>
1869   * <li>the region is already opened: this new open request will be ignored.</li>
1870   * </ul>
1871   * <p>
1872   * Bulk assign: If there are more than 1 region to open, it will be considered as a bulk assign.
1873   * For a single region opening, errors are sent through a ServiceException. For bulk assign,
1874   * errors are put in the response as FAILED_OPENING.
1875   * </p>
1876   * @param controller the RPC controller
1877   * @param request    the request n
1878   */
1879  @Override
1880  @QosPriority(priority = HConstants.ADMIN_QOS)
1881  public OpenRegionResponse openRegion(final RpcController controller,
1882    final OpenRegionRequest request) throws ServiceException {
1883    requestCount.increment();
1884    throwOnWrongStartCode(request);
1885
1886    OpenRegionResponse.Builder builder = OpenRegionResponse.newBuilder();
1887    final int regionCount = request.getOpenInfoCount();
1888    final Map<TableName, TableDescriptor> htds = new HashMap<>(regionCount);
1889    final boolean isBulkAssign = regionCount > 1;
1890    try {
1891      checkOpen();
1892    } catch (IOException ie) {
1893      TableName tableName = null;
1894      if (regionCount == 1) {
1895        org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionInfo ri =
1896          request.getOpenInfo(0).getRegion();
1897        if (ri != null) {
1898          tableName = ProtobufUtil.toTableName(ri.getTableName());
1899        }
1900      }
1901      if (!TableName.META_TABLE_NAME.equals(tableName)) {
1902        throw new ServiceException(ie);
1903      }
1904      // We are assigning meta, wait a little for regionserver to finish initialization.
1905      // Default to quarter of RPC timeout
1906      int timeout = server.getConfiguration().getInt(HConstants.HBASE_RPC_TIMEOUT_KEY,
1907        HConstants.DEFAULT_HBASE_RPC_TIMEOUT) >> 2;
1908      long endTime = EnvironmentEdgeManager.currentTime() + timeout;
1909      synchronized (server.online) {
1910        try {
1911          while (
1912            EnvironmentEdgeManager.currentTime() <= endTime && !server.isStopped()
1913              && !server.isOnline()
1914          ) {
1915            server.online.wait(server.getMsgInterval());
1916          }
1917          checkOpen();
1918        } catch (InterruptedException t) {
1919          Thread.currentThread().interrupt();
1920          throw new ServiceException(t);
1921        } catch (IOException e) {
1922          throw new ServiceException(e);
1923        }
1924      }
1925    }
1926
1927    long masterSystemTime = request.hasMasterSystemTime() ? request.getMasterSystemTime() : -1;
1928
1929    for (RegionOpenInfo regionOpenInfo : request.getOpenInfoList()) {
1930      final RegionInfo region = ProtobufUtil.toRegionInfo(regionOpenInfo.getRegion());
1931      TableDescriptor htd;
1932      try {
1933        String encodedName = region.getEncodedName();
1934        byte[] encodedNameBytes = region.getEncodedNameAsBytes();
1935        final HRegion onlineRegion = server.getRegion(encodedName);
1936        if (onlineRegion != null) {
1937          // The region is already online. This should not happen any more.
1938          String error = "Received OPEN for the region:" + region.getRegionNameAsString()
1939            + ", which is already online";
1940          LOG.warn(error);
1941          // server.abort(error);
1942          // throw new IOException(error);
1943          builder.addOpeningState(RegionOpeningState.OPENED);
1944          continue;
1945        }
1946        LOG.info("Open " + region.getRegionNameAsString());
1947
1948        final Boolean previous =
1949          server.getRegionsInTransitionInRS().putIfAbsent(encodedNameBytes, Boolean.TRUE);
1950
1951        if (Boolean.FALSE.equals(previous)) {
1952          if (server.getRegion(encodedName) != null) {
1953            // There is a close in progress. This should not happen any more.
1954            String error = "Received OPEN for the region:" + region.getRegionNameAsString()
1955              + ", which we are already trying to CLOSE";
1956            server.abort(error);
1957            throw new IOException(error);
1958          }
1959          server.getRegionsInTransitionInRS().put(encodedNameBytes, Boolean.TRUE);
1960        }
1961
1962        if (Boolean.TRUE.equals(previous)) {
1963          // An open is in progress. This is supported, but let's log this.
1964          LOG.info("Receiving OPEN for the region:" + region.getRegionNameAsString()
1965            + ", which we are already trying to OPEN"
1966            + " - ignoring this new request for this region.");
1967        }
1968
1969        // We are opening this region. If it moves back and forth for whatever reason, we don't
1970        // want to keep returning the stale moved record while we are opening/if we close again.
1971        server.removeFromMovedRegions(region.getEncodedName());
1972
1973        if (previous == null || !previous.booleanValue()) {
1974          htd = htds.get(region.getTable());
1975          if (htd == null) {
1976            htd = server.getTableDescriptors().get(region.getTable());
1977            htds.put(region.getTable(), htd);
1978          }
1979          if (htd == null) {
1980            throw new IOException("Missing table descriptor for " + region.getEncodedName());
1981          }
1982          // If there is no action in progress, we can submit a specific handler.
1983          // Need to pass the expected version in the constructor.
1984          if (server.getExecutorService() == null) {
1985            LOG.info("No executor executorService; skipping open request");
1986          } else {
1987            if (region.isMetaRegion()) {
1988              server.getExecutorService()
1989                .submit(new OpenMetaHandler(server, server, region, htd, masterSystemTime));
1990            } else {
1991              if (regionOpenInfo.getFavoredNodesCount() > 0) {
1992                server.updateRegionFavoredNodesMapping(region.getEncodedName(),
1993                  regionOpenInfo.getFavoredNodesList());
1994              }
1995              if (htd.getPriority() >= HConstants.ADMIN_QOS || region.getTable().isSystemTable()) {
1996                server.getExecutorService().submit(
1997                  new OpenPriorityRegionHandler(server, server, region, htd, masterSystemTime));
1998              } else {
1999                server.getExecutorService()
2000                  .submit(new OpenRegionHandler(server, server, region, htd, masterSystemTime));
2001              }
2002            }
2003          }
2004        }
2005
2006        builder.addOpeningState(RegionOpeningState.OPENED);
2007      } catch (IOException ie) {
2008        LOG.warn("Failed opening region " + region.getRegionNameAsString(), ie);
2009        if (isBulkAssign) {
2010          builder.addOpeningState(RegionOpeningState.FAILED_OPENING);
2011        } else {
2012          throw new ServiceException(ie);
2013        }
2014      }
2015    }
2016    return builder.build();
2017  }
2018
2019  /**
2020   * Warmup a region on this server. This method should only be called by Master. It synchronously
2021   * opens the region and closes the region bringing the most important pages in cache.
2022   */
2023  @Override
2024  public WarmupRegionResponse warmupRegion(final RpcController controller,
2025    final WarmupRegionRequest request) throws ServiceException {
2026    final RegionInfo region = ProtobufUtil.toRegionInfo(request.getRegionInfo());
2027    WarmupRegionResponse response = WarmupRegionResponse.getDefaultInstance();
2028    try {
2029      checkOpen();
2030      String encodedName = region.getEncodedName();
2031      byte[] encodedNameBytes = region.getEncodedNameAsBytes();
2032      final HRegion onlineRegion = server.getRegion(encodedName);
2033      if (onlineRegion != null) {
2034        LOG.info("{} is online; skipping warmup", region);
2035        return response;
2036      }
2037      TableDescriptor htd = server.getTableDescriptors().get(region.getTable());
2038      if (server.getRegionsInTransitionInRS().containsKey(encodedNameBytes)) {
2039        LOG.info("{} is in transition; skipping warmup", region);
2040        return response;
2041      }
2042      LOG.info("Warmup {}", region.getRegionNameAsString());
2043      HRegion.warmupHRegion(region, htd, server.getWAL(region), server.getConfiguration(), server,
2044        null);
2045    } catch (IOException ie) {
2046      LOG.error("Failed warmup of {}", region.getRegionNameAsString(), ie);
2047      throw new ServiceException(ie);
2048    }
2049
2050    return response;
2051  }
2052
2053  private CellScanner getAndReset(RpcController controller) {
2054    HBaseRpcController hrc = (HBaseRpcController) controller;
2055    CellScanner cells = hrc.cellScanner();
2056    hrc.setCellScanner(null);
2057    return cells;
2058  }
2059
2060  /**
2061   * Replay the given changes when distributedLogReplay WAL edits from a failed RS. The guarantee is
2062   * that the given mutations will be durable on the receiving RS if this method returns without any
2063   * exception.
2064   * @param controller the RPC controller
2065   * @param request    the request
2066   * @deprecated Since 3.0.0, will be removed in 4.0.0. Not used any more, put here only for
2067   *             compatibility with old region replica implementation. Now we will use
2068   *             {@code replicateToReplica} method instead.
2069   */
2070  @Deprecated
2071  @Override
2072  @QosPriority(priority = HConstants.REPLAY_QOS)
2073  public ReplicateWALEntryResponse replay(final RpcController controller,
2074    final ReplicateWALEntryRequest request) throws ServiceException {
2075    long before = EnvironmentEdgeManager.currentTime();
2076    CellScanner cells = getAndReset(controller);
2077    try {
2078      checkOpen();
2079      List<WALEntry> entries = request.getEntryList();
2080      if (entries == null || entries.isEmpty()) {
2081        // empty input
2082        return ReplicateWALEntryResponse.newBuilder().build();
2083      }
2084      ByteString regionName = entries.get(0).getKey().getEncodedRegionName();
2085      HRegion region = server.getRegionByEncodedName(regionName.toStringUtf8());
2086      RegionCoprocessorHost coprocessorHost =
2087        ServerRegionReplicaUtil.isDefaultReplica(region.getRegionInfo())
2088          ? region.getCoprocessorHost()
2089          : null; // do not invoke coprocessors if this is a secondary region replica
2090      List<Pair<WALKey, WALEdit>> walEntries = new ArrayList<>();
2091
2092      // Skip adding the edits to WAL if this is a secondary region replica
2093      boolean isPrimary = RegionReplicaUtil.isDefaultReplica(region.getRegionInfo());
2094      Durability durability = isPrimary ? Durability.USE_DEFAULT : Durability.SKIP_WAL;
2095
2096      for (WALEntry entry : entries) {
2097        if (!regionName.equals(entry.getKey().getEncodedRegionName())) {
2098          throw new NotServingRegionException("Replay request contains entries from multiple "
2099            + "regions. First region:" + regionName.toStringUtf8() + " , other region:"
2100            + entry.getKey().getEncodedRegionName());
2101        }
2102        if (server.nonceManager != null && isPrimary) {
2103          long nonceGroup =
2104            entry.getKey().hasNonceGroup() ? entry.getKey().getNonceGroup() : HConstants.NO_NONCE;
2105          long nonce = entry.getKey().hasNonce() ? entry.getKey().getNonce() : HConstants.NO_NONCE;
2106          server.nonceManager.reportOperationFromWal(nonceGroup, nonce,
2107            entry.getKey().getWriteTime());
2108        }
2109        Pair<WALKey, WALEdit> walEntry = (coprocessorHost == null) ? null : new Pair<>();
2110        List<MutationReplay> edits =
2111          WALSplitUtil.getMutationsFromWALEntry(entry, cells, walEntry, durability);
2112        if (coprocessorHost != null) {
2113          // Start coprocessor replay here. The coprocessor is for each WALEdit instead of a
2114          // KeyValue.
2115          if (
2116            coprocessorHost.preWALRestore(region.getRegionInfo(), walEntry.getFirst(),
2117              walEntry.getSecond())
2118          ) {
2119            // if bypass this log entry, ignore it ...
2120            continue;
2121          }
2122          walEntries.add(walEntry);
2123        }
2124        if (edits != null && !edits.isEmpty()) {
2125          // HBASE-17924
2126          // sort to improve lock efficiency
2127          Collections.sort(edits, (v1, v2) -> Row.COMPARATOR.compare(v1.mutation, v2.mutation));
2128          long replaySeqId = (entry.getKey().hasOrigSequenceNumber())
2129            ? entry.getKey().getOrigSequenceNumber()
2130            : entry.getKey().getLogSequenceNumber();
2131          OperationStatus[] result = doReplayBatchOp(region, edits, replaySeqId);
2132          // check if it's a partial success
2133          for (int i = 0; result != null && i < result.length; i++) {
2134            if (result[i] != OperationStatus.SUCCESS) {
2135              throw new IOException(result[i].getExceptionMsg());
2136            }
2137          }
2138        }
2139      }
2140
2141      // sync wal at the end because ASYNC_WAL is used above
2142      WAL wal = region.getWAL();
2143      if (wal != null) {
2144        wal.sync();
2145      }
2146
2147      if (coprocessorHost != null) {
2148        for (Pair<WALKey, WALEdit> entry : walEntries) {
2149          coprocessorHost.postWALRestore(region.getRegionInfo(), entry.getFirst(),
2150            entry.getSecond());
2151        }
2152      }
2153      return ReplicateWALEntryResponse.newBuilder().build();
2154    } catch (IOException ie) {
2155      throw new ServiceException(ie);
2156    } finally {
2157      final MetricsRegionServer metricsRegionServer = server.getMetrics();
2158      if (metricsRegionServer != null) {
2159        metricsRegionServer.updateReplay(EnvironmentEdgeManager.currentTime() - before);
2160      }
2161    }
2162  }
2163
2164  /**
2165   * Replay the given changes on a secondary replica
2166   */
2167  @Override
2168  public ReplicateWALEntryResponse replicateToReplica(RpcController controller,
2169    ReplicateWALEntryRequest request) throws ServiceException {
2170    CellScanner cells = getAndReset(controller);
2171    try {
2172      checkOpen();
2173      List<WALEntry> entries = request.getEntryList();
2174      if (entries == null || entries.isEmpty()) {
2175        // empty input
2176        return ReplicateWALEntryResponse.newBuilder().build();
2177      }
2178      ByteString regionName = entries.get(0).getKey().getEncodedRegionName();
2179      HRegion region = server.getRegionByEncodedName(regionName.toStringUtf8());
2180      if (RegionReplicaUtil.isDefaultReplica(region.getRegionInfo())) {
2181        throw new DoNotRetryIOException(
2182          "Should not replicate to primary replica " + region.getRegionInfo() + ", CODE BUG?");
2183      }
2184      for (WALEntry entry : entries) {
2185        if (!regionName.equals(entry.getKey().getEncodedRegionName())) {
2186          throw new NotServingRegionException(
2187            "ReplicateToReplica request contains entries from multiple " + "regions. First region:"
2188              + regionName.toStringUtf8() + " , other region:"
2189              + entry.getKey().getEncodedRegionName());
2190        }
2191        region.replayWALEntry(entry, cells);
2192      }
2193      return ReplicateWALEntryResponse.newBuilder().build();
2194    } catch (IOException ie) {
2195      throw new ServiceException(ie);
2196    }
2197  }
2198
2199  private void checkShouldRejectReplicationRequest(List<WALEntry> entries) throws IOException {
2200    ReplicationSourceService replicationSource = server.getReplicationSourceService();
2201    if (replicationSource == null || entries.isEmpty()) {
2202      return;
2203    }
2204    // We can ensure that all entries are for one peer, so only need to check one entry's
2205    // table name. if the table hit sync replication at peer side and the peer cluster
2206    // is (or is transiting to) state ACTIVE or DOWNGRADE_ACTIVE, we should reject to apply
2207    // those entries according to the design doc.
2208    TableName table = TableName.valueOf(entries.get(0).getKey().getTableName().toByteArray());
2209    if (
2210      replicationSource.getSyncReplicationPeerInfoProvider().checkState(table,
2211        RejectReplicationRequestStateChecker.get())
2212    ) {
2213      throw new DoNotRetryIOException(
2214        "Reject to apply to sink cluster because sync replication state of sink cluster "
2215          + "is ACTIVE or DOWNGRADE_ACTIVE, table: " + table);
2216    }
2217  }
2218
2219  /**
2220   * Replicate WAL entries on the region server.
2221   * @param controller the RPC controller
2222   * @param request    the request
2223   */
2224  @Override
2225  @QosPriority(priority = HConstants.REPLICATION_QOS)
2226  public ReplicateWALEntryResponse replicateWALEntry(final RpcController controller,
2227    final ReplicateWALEntryRequest request) throws ServiceException {
2228    try {
2229      checkOpen();
2230      if (server.getReplicationSinkService() != null) {
2231        requestCount.increment();
2232        List<WALEntry> entries = request.getEntryList();
2233        checkShouldRejectReplicationRequest(entries);
2234        CellScanner cellScanner = getAndReset(controller);
2235        server.getRegionServerCoprocessorHost().preReplicateLogEntries();
2236        server.getReplicationSinkService().replicateLogEntries(entries, cellScanner,
2237          request.getReplicationClusterId(), request.getSourceBaseNamespaceDirPath(),
2238          request.getSourceHFileArchiveDirPath());
2239        server.getRegionServerCoprocessorHost().postReplicateLogEntries();
2240        return ReplicateWALEntryResponse.newBuilder().build();
2241      } else {
2242        throw new ServiceException("Replication services are not initialized yet");
2243      }
2244    } catch (IOException ie) {
2245      throw new ServiceException(ie);
2246    }
2247  }
2248
2249  /**
2250   * Roll the WAL writer of the region server.
2251   * @param controller the RPC controller
2252   * @param request    the request n
2253   */
2254  @Override
2255  public RollWALWriterResponse rollWALWriter(final RpcController controller,
2256    final RollWALWriterRequest request) throws ServiceException {
2257    try {
2258      checkOpen();
2259      requestCount.increment();
2260      server.getRegionServerCoprocessorHost().preRollWALWriterRequest();
2261      server.getWalRoller().requestRollAll();
2262      server.getRegionServerCoprocessorHost().postRollWALWriterRequest();
2263      RollWALWriterResponse.Builder builder = RollWALWriterResponse.newBuilder();
2264      return builder.build();
2265    } catch (IOException ie) {
2266      throw new ServiceException(ie);
2267    }
2268  }
2269
2270  /**
2271   * Stop the region server.
2272   * @param controller the RPC controller
2273   * @param request    the request n
2274   */
2275  @Override
2276  @QosPriority(priority = HConstants.ADMIN_QOS)
2277  public StopServerResponse stopServer(final RpcController controller,
2278    final StopServerRequest request) throws ServiceException {
2279    rpcPreCheck("stopServer");
2280    requestCount.increment();
2281    String reason = request.getReason();
2282    server.stop(reason);
2283    return StopServerResponse.newBuilder().build();
2284  }
2285
2286  @Override
2287  public UpdateFavoredNodesResponse updateFavoredNodes(RpcController controller,
2288    UpdateFavoredNodesRequest request) throws ServiceException {
2289    rpcPreCheck("updateFavoredNodes");
2290    List<UpdateFavoredNodesRequest.RegionUpdateInfo> openInfoList = request.getUpdateInfoList();
2291    UpdateFavoredNodesResponse.Builder respBuilder = UpdateFavoredNodesResponse.newBuilder();
2292    for (UpdateFavoredNodesRequest.RegionUpdateInfo regionUpdateInfo : openInfoList) {
2293      RegionInfo hri = ProtobufUtil.toRegionInfo(regionUpdateInfo.getRegion());
2294      if (regionUpdateInfo.getFavoredNodesCount() > 0) {
2295        server.updateRegionFavoredNodesMapping(hri.getEncodedName(),
2296          regionUpdateInfo.getFavoredNodesList());
2297      }
2298    }
2299    respBuilder.setResponse(openInfoList.size());
2300    return respBuilder.build();
2301  }
2302
2303  /**
2304   * Atomically bulk load several HFiles into an open region
2305   * @return true if successful, false is failed but recoverably (no action)
2306   * @throws ServiceException if failed unrecoverably
2307   */
2308  @Override
2309  public BulkLoadHFileResponse bulkLoadHFile(final RpcController controller,
2310    final BulkLoadHFileRequest request) throws ServiceException {
2311    long start = EnvironmentEdgeManager.currentTime();
2312    List<String> clusterIds = new ArrayList<>(request.getClusterIdsList());
2313    if (clusterIds.contains(this.server.getClusterId())) {
2314      return BulkLoadHFileResponse.newBuilder().setLoaded(true).build();
2315    } else {
2316      clusterIds.add(this.server.getClusterId());
2317    }
2318    try {
2319      checkOpen();
2320      requestCount.increment();
2321      HRegion region = getRegion(request.getRegion());
2322      final boolean spaceQuotaEnabled = QuotaUtil.isQuotaEnabled(getConfiguration());
2323      long sizeToBeLoaded = -1;
2324
2325      // Check to see if this bulk load would exceed the space quota for this table
2326      if (spaceQuotaEnabled) {
2327        ActivePolicyEnforcement activeSpaceQuotas = getSpaceQuotaManager().getActiveEnforcements();
2328        SpaceViolationPolicyEnforcement enforcement =
2329          activeSpaceQuotas.getPolicyEnforcement(region);
2330        if (enforcement != null) {
2331          // Bulk loads must still be atomic. We must enact all or none.
2332          List<String> filePaths = new ArrayList<>(request.getFamilyPathCount());
2333          for (FamilyPath familyPath : request.getFamilyPathList()) {
2334            filePaths.add(familyPath.getPath());
2335          }
2336          // Check if the batch of files exceeds the current quota
2337          sizeToBeLoaded = enforcement.computeBulkLoadSize(getFileSystem(filePaths), filePaths);
2338        }
2339      }
2340      // secure bulk load
2341      Map<byte[], List<Path>> map =
2342        server.getSecureBulkLoadManager().secureBulkLoadHFiles(region, request, clusterIds);
2343      BulkLoadHFileResponse.Builder builder = BulkLoadHFileResponse.newBuilder();
2344      builder.setLoaded(map != null);
2345      if (map != null) {
2346        // Treat any negative size as a flag to "ignore" updating the region size as that is
2347        // not possible to occur in real life (cannot bulk load a file with negative size)
2348        if (spaceQuotaEnabled && sizeToBeLoaded > 0) {
2349          if (LOG.isTraceEnabled()) {
2350            LOG.trace("Incrementing space use of " + region.getRegionInfo() + " by "
2351              + sizeToBeLoaded + " bytes");
2352          }
2353          // Inform space quotas of the new files for this region
2354          getSpaceQuotaManager().getRegionSizeStore().incrementRegionSize(region.getRegionInfo(),
2355            sizeToBeLoaded);
2356        }
2357      }
2358      return builder.build();
2359    } catch (IOException ie) {
2360      throw new ServiceException(ie);
2361    } finally {
2362      final MetricsRegionServer metricsRegionServer = server.getMetrics();
2363      if (metricsRegionServer != null) {
2364        metricsRegionServer.updateBulkLoad(EnvironmentEdgeManager.currentTime() - start);
2365      }
2366    }
2367  }
2368
2369  @Override
2370  public PrepareBulkLoadResponse prepareBulkLoad(RpcController controller,
2371    PrepareBulkLoadRequest request) throws ServiceException {
2372    try {
2373      checkOpen();
2374      requestCount.increment();
2375
2376      HRegion region = getRegion(request.getRegion());
2377
2378      String bulkToken = server.getSecureBulkLoadManager().prepareBulkLoad(region, request);
2379      PrepareBulkLoadResponse.Builder builder = PrepareBulkLoadResponse.newBuilder();
2380      builder.setBulkToken(bulkToken);
2381      return builder.build();
2382    } catch (IOException ie) {
2383      throw new ServiceException(ie);
2384    }
2385  }
2386
2387  @Override
2388  public CleanupBulkLoadResponse cleanupBulkLoad(RpcController controller,
2389    CleanupBulkLoadRequest request) throws ServiceException {
2390    try {
2391      checkOpen();
2392      requestCount.increment();
2393
2394      HRegion region = getRegion(request.getRegion());
2395
2396      server.getSecureBulkLoadManager().cleanupBulkLoad(region, request);
2397      return CleanupBulkLoadResponse.newBuilder().build();
2398    } catch (IOException ie) {
2399      throw new ServiceException(ie);
2400    }
2401  }
2402
2403  @Override
2404  public CoprocessorServiceResponse execService(final RpcController controller,
2405    final CoprocessorServiceRequest request) throws ServiceException {
2406    try {
2407      checkOpen();
2408      requestCount.increment();
2409      HRegion region = getRegion(request.getRegion());
2410      Message result = execServiceOnRegion(region, request.getCall());
2411      CoprocessorServiceResponse.Builder builder = CoprocessorServiceResponse.newBuilder();
2412      builder.setRegion(RequestConverter.buildRegionSpecifier(RegionSpecifierType.REGION_NAME,
2413        region.getRegionInfo().getRegionName()));
2414      // TODO: COPIES!!!!!!
2415      builder.setValue(builder.getValueBuilder().setName(result.getClass().getName()).setValue(
2416        org.apache.hbase.thirdparty.com.google.protobuf.ByteString.copyFrom(result.toByteArray())));
2417      return builder.build();
2418    } catch (IOException ie) {
2419      throw new ServiceException(ie);
2420    }
2421  }
2422
2423  private FileSystem getFileSystem(List<String> filePaths) throws IOException {
2424    if (filePaths.isEmpty()) {
2425      // local hdfs
2426      return server.getFileSystem();
2427    }
2428    // source hdfs
2429    return new Path(filePaths.get(0)).getFileSystem(server.getConfiguration());
2430  }
2431
2432  private Message execServiceOnRegion(HRegion region,
2433    final ClientProtos.CoprocessorServiceCall serviceCall) throws IOException {
2434    // ignore the passed in controller (from the serialized call)
2435    ServerRpcController execController = new ServerRpcController();
2436    return region.execService(execController, serviceCall);
2437  }
2438
2439  private boolean shouldRejectRequestsFromClient(HRegion region) {
2440    TableName table = region.getRegionInfo().getTable();
2441    ReplicationSourceService service = server.getReplicationSourceService();
2442    return service != null && service.getSyncReplicationPeerInfoProvider().checkState(table,
2443      RejectRequestsFromClientStateChecker.get());
2444  }
2445
2446  private void rejectIfInStandByState(HRegion region) throws DoNotRetryIOException {
2447    if (shouldRejectRequestsFromClient(region)) {
2448      throw new DoNotRetryIOException(
2449        region.getRegionInfo().getRegionNameAsString() + " is in STANDBY state.");
2450    }
2451  }
2452
2453  /**
2454   * Get data from a table.
2455   * @param controller the RPC controller
2456   * @param request    the get request n
2457   */
2458  @Override
2459  public GetResponse get(final RpcController controller, final GetRequest request)
2460    throws ServiceException {
2461    long before = EnvironmentEdgeManager.currentTime();
2462    OperationQuota quota = null;
2463    HRegion region = null;
2464    try {
2465      checkOpen();
2466      requestCount.increment();
2467      rpcGetRequestCount.increment();
2468      region = getRegion(request.getRegion());
2469      rejectIfInStandByState(region);
2470
2471      GetResponse.Builder builder = GetResponse.newBuilder();
2472      ClientProtos.Get get = request.getGet();
2473      // An asynchbase client, https://github.com/OpenTSDB/asynchbase, starts by trying to do
2474      // a get closest before. Throwing the UnknownProtocolException signals it that it needs
2475      // to switch and do hbase2 protocol (HBase servers do not tell clients what versions
2476      // they are; its a problem for non-native clients like asynchbase. HBASE-20225.
2477      if (get.hasClosestRowBefore() && get.getClosestRowBefore()) {
2478        throw new UnknownProtocolException("Is this a pre-hbase-1.0.0 or asynchbase client? "
2479          + "Client is invoking getClosestRowBefore removed in hbase-2.0.0 replaced by "
2480          + "reverse Scan.");
2481      }
2482      Boolean existence = null;
2483      Result r = null;
2484      RpcCallContext context = RpcServer.getCurrentCall().orElse(null);
2485      quota = getRpcQuotaManager().checkQuota(region, OperationQuota.OperationType.GET);
2486
2487      Get clientGet = ProtobufUtil.toGet(get);
2488      if (get.getExistenceOnly() && region.getCoprocessorHost() != null) {
2489        existence = region.getCoprocessorHost().preExists(clientGet);
2490      }
2491      if (existence == null) {
2492        if (context != null) {
2493          r = get(clientGet, (region), null, context);
2494        } else {
2495          // for test purpose
2496          r = region.get(clientGet);
2497        }
2498        if (get.getExistenceOnly()) {
2499          boolean exists = r.getExists();
2500          if (region.getCoprocessorHost() != null) {
2501            exists = region.getCoprocessorHost().postExists(clientGet, exists);
2502          }
2503          existence = exists;
2504        }
2505      }
2506      if (existence != null) {
2507        ClientProtos.Result pbr =
2508          ProtobufUtil.toResult(existence, region.getRegionInfo().getReplicaId() != 0);
2509        builder.setResult(pbr);
2510      } else if (r != null) {
2511        ClientProtos.Result pbr;
2512        if (
2513          isClientCellBlockSupport(context) && controller instanceof HBaseRpcController
2514            && VersionInfoUtil.hasMinimumVersion(context.getClientVersionInfo(), 1, 3)
2515        ) {
2516          pbr = ProtobufUtil.toResultNoData(r);
2517          ((HBaseRpcController) controller)
2518            .setCellScanner(CellUtil.createCellScanner(r.rawCells()));
2519          addSize(context, r, null);
2520        } else {
2521          pbr = ProtobufUtil.toResult(r);
2522        }
2523        builder.setResult(pbr);
2524      }
2525      // r.cells is null when an table.exists(get) call
2526      if (r != null && r.rawCells() != null) {
2527        quota.addGetResult(r);
2528      }
2529      return builder.build();
2530    } catch (IOException ie) {
2531      throw new ServiceException(ie);
2532    } finally {
2533      final MetricsRegionServer metricsRegionServer = server.getMetrics();
2534      if (metricsRegionServer != null) {
2535        TableDescriptor td = region != null ? region.getTableDescriptor() : null;
2536        if (td != null) {
2537          metricsRegionServer.updateGet(td.getTableName(),
2538            EnvironmentEdgeManager.currentTime() - before);
2539        }
2540      }
2541      if (quota != null) {
2542        quota.close();
2543      }
2544    }
2545  }
2546
2547  private Result get(Get get, HRegion region, RegionScannersCloseCallBack closeCallBack,
2548    RpcCallContext context) throws IOException {
2549    region.prepareGet(get);
2550    boolean stale = region.getRegionInfo().getReplicaId() != 0;
2551
2552    // This method is almost the same as HRegion#get.
2553    List<Cell> results = new ArrayList<>();
2554    long before = EnvironmentEdgeManager.currentTime();
2555    // pre-get CP hook
2556    if (region.getCoprocessorHost() != null) {
2557      if (region.getCoprocessorHost().preGet(get, results)) {
2558        region.metricsUpdateForGet(results, before);
2559        return Result.create(results, get.isCheckExistenceOnly() ? !results.isEmpty() : null,
2560          stale);
2561      }
2562    }
2563    Scan scan = new Scan(get);
2564    if (scan.getLoadColumnFamiliesOnDemandValue() == null) {
2565      scan.setLoadColumnFamiliesOnDemand(region.isLoadingCfsOnDemandDefault());
2566    }
2567    RegionScannerImpl scanner = null;
2568    try {
2569      scanner = region.getScanner(scan);
2570      scanner.next(results);
2571    } finally {
2572      if (scanner != null) {
2573        if (closeCallBack == null) {
2574          // If there is a context then the scanner can be added to the current
2575          // RpcCallContext. The rpc callback will take care of closing the
2576          // scanner, for eg in case
2577          // of get()
2578          context.setCallBack(scanner);
2579        } else {
2580          // The call is from multi() where the results from the get() are
2581          // aggregated and then send out to the
2582          // rpc. The rpccall back will close all such scanners created as part
2583          // of multi().
2584          closeCallBack.addScanner(scanner);
2585        }
2586      }
2587    }
2588
2589    // post-get CP hook
2590    if (region.getCoprocessorHost() != null) {
2591      region.getCoprocessorHost().postGet(get, results);
2592    }
2593    region.metricsUpdateForGet(results, before);
2594
2595    return Result.create(results, get.isCheckExistenceOnly() ? !results.isEmpty() : null, stale);
2596  }
2597
2598  private void checkBatchSizeAndLogLargeSize(MultiRequest request) throws ServiceException {
2599    int sum = 0;
2600    String firstRegionName = null;
2601    for (RegionAction regionAction : request.getRegionActionList()) {
2602      if (sum == 0) {
2603        firstRegionName = Bytes.toStringBinary(regionAction.getRegion().getValue().toByteArray());
2604      }
2605      sum += regionAction.getActionCount();
2606    }
2607    if (sum > rowSizeWarnThreshold) {
2608      LOG.warn("Large batch operation detected (greater than " + rowSizeWarnThreshold
2609        + ") (HBASE-18023)." + " Requested Number of Rows: " + sum + " Client: "
2610        + RpcServer.getRequestUserName().orElse(null) + "/"
2611        + RpcServer.getRemoteAddress().orElse(null) + " first region in multi=" + firstRegionName);
2612      if (rejectRowsWithSizeOverThreshold) {
2613        throw new ServiceException(
2614          "Rejecting large batch operation for current batch with firstRegionName: "
2615            + firstRegionName + " , Requested Number of Rows: " + sum + " , Size Threshold: "
2616            + rowSizeWarnThreshold);
2617      }
2618    }
2619  }
2620
2621  private void failRegionAction(MultiResponse.Builder responseBuilder,
2622    RegionActionResult.Builder regionActionResultBuilder, RegionAction regionAction,
2623    CellScanner cellScanner, Throwable error) {
2624    rpcServer.getMetrics().exception(error);
2625    regionActionResultBuilder.setException(ResponseConverter.buildException(error));
2626    responseBuilder.addRegionActionResult(regionActionResultBuilder.build());
2627    // All Mutations in this RegionAction not executed as we can not see the Region online here
2628    // in this RS. Will be retried from Client. Skipping all the Cells in CellScanner
2629    // corresponding to these Mutations.
2630    if (cellScanner != null) {
2631      skipCellsForMutations(regionAction.getActionList(), cellScanner);
2632    }
2633  }
2634
2635  private boolean isReplicationRequest(Action action) {
2636    // replication request can only be put or delete.
2637    if (!action.hasMutation()) {
2638      return false;
2639    }
2640    MutationProto mutation = action.getMutation();
2641    MutationType type = mutation.getMutateType();
2642    if (type != MutationType.PUT && type != MutationType.DELETE) {
2643      return false;
2644    }
2645    // replication will set a special attribute so we can make use of it to decide whether a request
2646    // is for replication.
2647    return mutation.getAttributeList().stream().map(p -> p.getName())
2648      .filter(n -> n.equals(ReplicationUtils.REPLICATION_ATTR_NAME)).findAny().isPresent();
2649  }
2650
2651  /**
2652   * Execute multiple actions on a table: get, mutate, and/or execCoprocessor
2653   * @param rpcc    the RPC controller
2654   * @param request the multi request n
2655   */
2656  @Override
2657  public MultiResponse multi(final RpcController rpcc, final MultiRequest request)
2658    throws ServiceException {
2659    try {
2660      checkOpen();
2661    } catch (IOException ie) {
2662      throw new ServiceException(ie);
2663    }
2664
2665    checkBatchSizeAndLogLargeSize(request);
2666
2667    // rpc controller is how we bring in data via the back door; it is unprotobuf'ed data.
2668    // It is also the conduit via which we pass back data.
2669    HBaseRpcController controller = (HBaseRpcController) rpcc;
2670    CellScanner cellScanner = controller != null ? controller.cellScanner() : null;
2671    if (controller != null) {
2672      controller.setCellScanner(null);
2673    }
2674
2675    long nonceGroup = request.hasNonceGroup() ? request.getNonceGroup() : HConstants.NO_NONCE;
2676
2677    MultiResponse.Builder responseBuilder = MultiResponse.newBuilder();
2678    RegionActionResult.Builder regionActionResultBuilder = RegionActionResult.newBuilder();
2679    this.rpcMultiRequestCount.increment();
2680    this.requestCount.increment();
2681    ActivePolicyEnforcement spaceQuotaEnforcement = getSpaceQuotaManager().getActiveEnforcements();
2682
2683    // We no longer use MultiRequest#condition. Instead, we use RegionAction#condition. The
2684    // following logic is for backward compatibility as old clients still use
2685    // MultiRequest#condition in case of checkAndMutate with RowMutations.
2686    if (request.hasCondition()) {
2687      if (request.getRegionActionList().isEmpty()) {
2688        // If the region action list is empty, do nothing.
2689        responseBuilder.setProcessed(true);
2690        return responseBuilder.build();
2691      }
2692
2693      RegionAction regionAction = request.getRegionAction(0);
2694
2695      // When request.hasCondition() is true, regionAction.getAtomic() should be always true. So
2696      // we can assume regionAction.getAtomic() is true here.
2697      assert regionAction.getAtomic();
2698
2699      OperationQuota quota;
2700      HRegion region;
2701      RegionSpecifier regionSpecifier = regionAction.getRegion();
2702
2703      try {
2704        region = getRegion(regionSpecifier);
2705        quota = getRpcQuotaManager().checkQuota(region, regionAction.getActionList());
2706      } catch (IOException e) {
2707        failRegionAction(responseBuilder, regionActionResultBuilder, regionAction, cellScanner, e);
2708        return responseBuilder.build();
2709      }
2710
2711      try {
2712        boolean rejectIfFromClient = shouldRejectRequestsFromClient(region);
2713        // We only allow replication in standby state and it will not set the atomic flag.
2714        if (rejectIfFromClient) {
2715          failRegionAction(responseBuilder, regionActionResultBuilder, regionAction, cellScanner,
2716            new DoNotRetryIOException(
2717              region.getRegionInfo().getRegionNameAsString() + " is in STANDBY state"));
2718          return responseBuilder.build();
2719        }
2720
2721        try {
2722          CheckAndMutateResult result = checkAndMutate(region, regionAction.getActionList(),
2723            cellScanner, request.getCondition(), nonceGroup, spaceQuotaEnforcement);
2724          responseBuilder.setProcessed(result.isSuccess());
2725          ClientProtos.ResultOrException.Builder resultOrExceptionOrBuilder =
2726            ClientProtos.ResultOrException.newBuilder();
2727          for (int i = 0; i < regionAction.getActionCount(); i++) {
2728            // To unify the response format with doNonAtomicRegionMutation and read through
2729            // client's AsyncProcess we have to add an empty result instance per operation
2730            resultOrExceptionOrBuilder.clear();
2731            resultOrExceptionOrBuilder.setIndex(i);
2732            regionActionResultBuilder.addResultOrException(resultOrExceptionOrBuilder.build());
2733          }
2734        } catch (IOException e) {
2735          rpcServer.getMetrics().exception(e);
2736          // As it's an atomic operation with a condition, we may expect it's a global failure.
2737          regionActionResultBuilder.setException(ResponseConverter.buildException(e));
2738        }
2739      } finally {
2740        quota.close();
2741      }
2742
2743      responseBuilder.addRegionActionResult(regionActionResultBuilder.build());
2744      ClientProtos.RegionLoadStats regionLoadStats = region.getLoadStatistics();
2745      if (regionLoadStats != null) {
2746        responseBuilder.setRegionStatistics(MultiRegionLoadStats.newBuilder()
2747          .addRegion(regionSpecifier).addStat(regionLoadStats).build());
2748      }
2749      return responseBuilder.build();
2750    }
2751
2752    // this will contain all the cells that we need to return. It's created later, if needed.
2753    List<CellScannable> cellsToReturn = null;
2754    RegionScannersCloseCallBack closeCallBack = null;
2755    RpcCallContext context = RpcServer.getCurrentCall().orElse(null);
2756    Map<RegionSpecifier, ClientProtos.RegionLoadStats> regionStats =
2757      new HashMap<>(request.getRegionActionCount());
2758
2759    for (RegionAction regionAction : request.getRegionActionList()) {
2760      OperationQuota quota;
2761      HRegion region;
2762      RegionSpecifier regionSpecifier = regionAction.getRegion();
2763      regionActionResultBuilder.clear();
2764
2765      try {
2766        region = getRegion(regionSpecifier);
2767        quota = getRpcQuotaManager().checkQuota(region, regionAction.getActionList());
2768      } catch (IOException e) {
2769        failRegionAction(responseBuilder, regionActionResultBuilder, regionAction, cellScanner, e);
2770        continue; // For this region it's a failure.
2771      }
2772
2773      try {
2774        boolean rejectIfFromClient = shouldRejectRequestsFromClient(region);
2775
2776        if (regionAction.hasCondition()) {
2777          // We only allow replication in standby state and it will not set the atomic flag.
2778          if (rejectIfFromClient) {
2779            failRegionAction(responseBuilder, regionActionResultBuilder, regionAction, cellScanner,
2780              new DoNotRetryIOException(
2781                region.getRegionInfo().getRegionNameAsString() + " is in STANDBY state"));
2782            continue;
2783          }
2784
2785          try {
2786            ClientProtos.ResultOrException.Builder resultOrExceptionOrBuilder =
2787              ClientProtos.ResultOrException.newBuilder();
2788            if (regionAction.getActionCount() == 1) {
2789              CheckAndMutateResult result =
2790                checkAndMutate(region, quota, regionAction.getAction(0).getMutation(), cellScanner,
2791                  regionAction.getCondition(), nonceGroup, spaceQuotaEnforcement);
2792              regionActionResultBuilder.setProcessed(result.isSuccess());
2793              resultOrExceptionOrBuilder.setIndex(0);
2794              if (result.getResult() != null) {
2795                resultOrExceptionOrBuilder.setResult(ProtobufUtil.toResult(result.getResult()));
2796              }
2797              regionActionResultBuilder.addResultOrException(resultOrExceptionOrBuilder.build());
2798            } else {
2799              CheckAndMutateResult result = checkAndMutate(region, regionAction.getActionList(),
2800                cellScanner, regionAction.getCondition(), nonceGroup, spaceQuotaEnforcement);
2801              regionActionResultBuilder.setProcessed(result.isSuccess());
2802              for (int i = 0; i < regionAction.getActionCount(); i++) {
2803                if (i == 0 && result.getResult() != null) {
2804                  // Set the result of the Increment/Append operations to the first element of the
2805                  // ResultOrException list
2806                  resultOrExceptionOrBuilder.setIndex(i);
2807                  regionActionResultBuilder.addResultOrException(resultOrExceptionOrBuilder
2808                    .setResult(ProtobufUtil.toResult(result.getResult())).build());
2809                  continue;
2810                }
2811                // To unify the response format with doNonAtomicRegionMutation and read through
2812                // client's AsyncProcess we have to add an empty result instance per operation
2813                resultOrExceptionOrBuilder.clear();
2814                resultOrExceptionOrBuilder.setIndex(i);
2815                regionActionResultBuilder.addResultOrException(resultOrExceptionOrBuilder.build());
2816              }
2817            }
2818          } catch (IOException e) {
2819            rpcServer.getMetrics().exception(e);
2820            // As it's an atomic operation with a condition, we may expect it's a global failure.
2821            regionActionResultBuilder.setException(ResponseConverter.buildException(e));
2822          }
2823        } else if (regionAction.hasAtomic() && regionAction.getAtomic()) {
2824          // We only allow replication in standby state and it will not set the atomic flag.
2825          if (rejectIfFromClient) {
2826            failRegionAction(responseBuilder, regionActionResultBuilder, regionAction, cellScanner,
2827              new DoNotRetryIOException(
2828                region.getRegionInfo().getRegionNameAsString() + " is in STANDBY state"));
2829            continue;
2830          }
2831          try {
2832            doAtomicBatchOp(regionActionResultBuilder, region, quota, regionAction.getActionList(),
2833              cellScanner, nonceGroup, spaceQuotaEnforcement);
2834            regionActionResultBuilder.setProcessed(true);
2835            // We no longer use MultiResponse#processed. Instead, we use
2836            // RegionActionResult#processed. This is for backward compatibility for old clients.
2837            responseBuilder.setProcessed(true);
2838          } catch (IOException e) {
2839            rpcServer.getMetrics().exception(e);
2840            // As it's atomic, we may expect it's a global failure.
2841            regionActionResultBuilder.setException(ResponseConverter.buildException(e));
2842          }
2843        } else {
2844          if (
2845            rejectIfFromClient && regionAction.getActionCount() > 0
2846              && !isReplicationRequest(regionAction.getAction(0))
2847          ) {
2848            // fail if it is not a replication request
2849            failRegionAction(responseBuilder, regionActionResultBuilder, regionAction, cellScanner,
2850              new DoNotRetryIOException(
2851                region.getRegionInfo().getRegionNameAsString() + " is in STANDBY state"));
2852            continue;
2853          }
2854          // doNonAtomicRegionMutation manages the exception internally
2855          if (context != null && closeCallBack == null) {
2856            // An RpcCallBack that creates a list of scanners that needs to perform callBack
2857            // operation on completion of multiGets.
2858            // Set this only once
2859            closeCallBack = new RegionScannersCloseCallBack();
2860            context.setCallBack(closeCallBack);
2861          }
2862          cellsToReturn = doNonAtomicRegionMutation(region, quota, regionAction, cellScanner,
2863            regionActionResultBuilder, cellsToReturn, nonceGroup, closeCallBack, context,
2864            spaceQuotaEnforcement);
2865        }
2866      } finally {
2867        quota.close();
2868      }
2869
2870      responseBuilder.addRegionActionResult(regionActionResultBuilder.build());
2871      ClientProtos.RegionLoadStats regionLoadStats = region.getLoadStatistics();
2872      if (regionLoadStats != null) {
2873        regionStats.put(regionSpecifier, regionLoadStats);
2874      }
2875    }
2876    // Load the controller with the Cells to return.
2877    if (cellsToReturn != null && !cellsToReturn.isEmpty() && controller != null) {
2878      controller.setCellScanner(CellUtil.createCellScanner(cellsToReturn));
2879    }
2880
2881    MultiRegionLoadStats.Builder builder = MultiRegionLoadStats.newBuilder();
2882    for (Entry<RegionSpecifier, ClientProtos.RegionLoadStats> stat : regionStats.entrySet()) {
2883      builder.addRegion(stat.getKey());
2884      builder.addStat(stat.getValue());
2885    }
2886    responseBuilder.setRegionStatistics(builder);
2887    return responseBuilder.build();
2888  }
2889
2890  private void skipCellsForMutations(List<Action> actions, CellScanner cellScanner) {
2891    if (cellScanner == null) {
2892      return;
2893    }
2894    for (Action action : actions) {
2895      skipCellsForMutation(action, cellScanner);
2896    }
2897  }
2898
2899  private void skipCellsForMutation(Action action, CellScanner cellScanner) {
2900    if (cellScanner == null) {
2901      return;
2902    }
2903    try {
2904      if (action.hasMutation()) {
2905        MutationProto m = action.getMutation();
2906        if (m.hasAssociatedCellCount()) {
2907          for (int i = 0; i < m.getAssociatedCellCount(); i++) {
2908            cellScanner.advance();
2909          }
2910        }
2911      }
2912    } catch (IOException e) {
2913      // No need to handle these Individual Muatation level issue. Any way this entire RegionAction
2914      // marked as failed as we could not see the Region here. At client side the top level
2915      // RegionAction exception will be considered first.
2916      LOG.error("Error while skipping Cells in CellScanner for invalid Region Mutations", e);
2917    }
2918  }
2919
2920  /**
2921   * Mutate data in a table.
2922   * @param rpcc    the RPC controller
2923   * @param request the mutate request
2924   */
2925  @Override
2926  public MutateResponse mutate(final RpcController rpcc, final MutateRequest request)
2927    throws ServiceException {
2928    // rpc controller is how we bring in data via the back door; it is unprotobuf'ed data.
2929    // It is also the conduit via which we pass back data.
2930    HBaseRpcController controller = (HBaseRpcController) rpcc;
2931    CellScanner cellScanner = controller != null ? controller.cellScanner() : null;
2932    OperationQuota quota = null;
2933    RpcCallContext context = RpcServer.getCurrentCall().orElse(null);
2934    // Clear scanner so we are not holding on to reference across call.
2935    if (controller != null) {
2936      controller.setCellScanner(null);
2937    }
2938    try {
2939      checkOpen();
2940      requestCount.increment();
2941      rpcMutateRequestCount.increment();
2942      HRegion region = getRegion(request.getRegion());
2943      rejectIfInStandByState(region);
2944      MutateResponse.Builder builder = MutateResponse.newBuilder();
2945      MutationProto mutation = request.getMutation();
2946      if (!region.getRegionInfo().isMetaRegion()) {
2947        server.getMemStoreFlusher().reclaimMemStoreMemory();
2948      }
2949      long nonceGroup = request.hasNonceGroup() ? request.getNonceGroup() : HConstants.NO_NONCE;
2950      quota = getRpcQuotaManager().checkQuota(region, OperationQuota.OperationType.MUTATE);
2951      ActivePolicyEnforcement spaceQuotaEnforcement =
2952        getSpaceQuotaManager().getActiveEnforcements();
2953
2954      if (request.hasCondition()) {
2955        CheckAndMutateResult result = checkAndMutate(region, quota, mutation, cellScanner,
2956          request.getCondition(), nonceGroup, spaceQuotaEnforcement);
2957        builder.setProcessed(result.isSuccess());
2958        boolean clientCellBlockSupported = isClientCellBlockSupport(context);
2959        addResult(builder, result.getResult(), controller, clientCellBlockSupported);
2960        if (clientCellBlockSupported) {
2961          addSize(context, result.getResult(), null);
2962        }
2963      } else {
2964        Result r = null;
2965        Boolean processed = null;
2966        MutationType type = mutation.getMutateType();
2967        switch (type) {
2968          case APPEND:
2969            // TODO: this doesn't actually check anything.
2970            r = append(region, quota, mutation, cellScanner, nonceGroup, spaceQuotaEnforcement);
2971            break;
2972          case INCREMENT:
2973            // TODO: this doesn't actually check anything.
2974            r = increment(region, quota, mutation, cellScanner, nonceGroup, spaceQuotaEnforcement);
2975            break;
2976          case PUT:
2977            put(region, quota, mutation, cellScanner, spaceQuotaEnforcement);
2978            processed = Boolean.TRUE;
2979            break;
2980          case DELETE:
2981            delete(region, quota, mutation, cellScanner, spaceQuotaEnforcement);
2982            processed = Boolean.TRUE;
2983            break;
2984          default:
2985            throw new DoNotRetryIOException("Unsupported mutate type: " + type.name());
2986        }
2987        if (processed != null) {
2988          builder.setProcessed(processed);
2989        }
2990        boolean clientCellBlockSupported = isClientCellBlockSupport(context);
2991        addResult(builder, r, controller, clientCellBlockSupported);
2992        if (clientCellBlockSupported) {
2993          addSize(context, r, null);
2994        }
2995      }
2996      return builder.build();
2997    } catch (IOException ie) {
2998      server.checkFileSystem();
2999      throw new ServiceException(ie);
3000    } finally {
3001      if (quota != null) {
3002        quota.close();
3003      }
3004    }
3005  }
3006
3007  private void put(HRegion region, OperationQuota quota, MutationProto mutation,
3008    CellScanner cellScanner, ActivePolicyEnforcement spaceQuota) throws IOException {
3009    long before = EnvironmentEdgeManager.currentTime();
3010    Put put = ProtobufUtil.toPut(mutation, cellScanner);
3011    checkCellSizeLimit(region, put);
3012    spaceQuota.getPolicyEnforcement(region).check(put);
3013    quota.addMutation(put);
3014    region.put(put);
3015
3016    MetricsRegionServer metricsRegionServer = server.getMetrics();
3017    if (metricsRegionServer != null) {
3018      long after = EnvironmentEdgeManager.currentTime();
3019      metricsRegionServer.updatePut(region.getRegionInfo().getTable(), after - before);
3020    }
3021  }
3022
3023  private void delete(HRegion region, OperationQuota quota, MutationProto mutation,
3024    CellScanner cellScanner, ActivePolicyEnforcement spaceQuota) throws IOException {
3025    long before = EnvironmentEdgeManager.currentTime();
3026    Delete delete = ProtobufUtil.toDelete(mutation, cellScanner);
3027    checkCellSizeLimit(region, delete);
3028    spaceQuota.getPolicyEnforcement(region).check(delete);
3029    quota.addMutation(delete);
3030    region.delete(delete);
3031
3032    MetricsRegionServer metricsRegionServer = server.getMetrics();
3033    if (metricsRegionServer != null) {
3034      long after = EnvironmentEdgeManager.currentTime();
3035      metricsRegionServer.updateDelete(region.getRegionInfo().getTable(), after - before);
3036    }
3037  }
3038
3039  private CheckAndMutateResult checkAndMutate(HRegion region, OperationQuota quota,
3040    MutationProto mutation, CellScanner cellScanner, Condition condition, long nonceGroup,
3041    ActivePolicyEnforcement spaceQuota) throws IOException {
3042    long before = EnvironmentEdgeManager.currentTime();
3043    CheckAndMutate checkAndMutate = ProtobufUtil.toCheckAndMutate(condition, mutation, cellScanner);
3044    long nonce = mutation.hasNonce() ? mutation.getNonce() : HConstants.NO_NONCE;
3045    checkCellSizeLimit(region, (Mutation) checkAndMutate.getAction());
3046    spaceQuota.getPolicyEnforcement(region).check((Mutation) checkAndMutate.getAction());
3047    quota.addMutation((Mutation) checkAndMutate.getAction());
3048
3049    CheckAndMutateResult result = null;
3050    if (region.getCoprocessorHost() != null) {
3051      result = region.getCoprocessorHost().preCheckAndMutate(checkAndMutate);
3052    }
3053    if (result == null) {
3054      result = region.checkAndMutate(checkAndMutate, nonceGroup, nonce);
3055      if (region.getCoprocessorHost() != null) {
3056        result = region.getCoprocessorHost().postCheckAndMutate(checkAndMutate, result);
3057      }
3058    }
3059    MetricsRegionServer metricsRegionServer = server.getMetrics();
3060    if (metricsRegionServer != null) {
3061      long after = EnvironmentEdgeManager.currentTime();
3062      metricsRegionServer.updateCheckAndMutate(region.getRegionInfo().getTable(), after - before);
3063
3064      MutationType type = mutation.getMutateType();
3065      switch (type) {
3066        case PUT:
3067          metricsRegionServer.updateCheckAndPut(region.getRegionInfo().getTable(), after - before);
3068          break;
3069        case DELETE:
3070          metricsRegionServer.updateCheckAndDelete(region.getRegionInfo().getTable(),
3071            after - before);
3072          break;
3073        default:
3074          break;
3075      }
3076    }
3077    return result;
3078  }
3079
3080  // This is used to keep compatible with the old client implementation. Consider remove it if we
3081  // decide to drop the support of the client that still sends close request to a region scanner
3082  // which has already been exhausted.
3083  @Deprecated
3084  private static final IOException SCANNER_ALREADY_CLOSED = new IOException() {
3085
3086    private static final long serialVersionUID = -4305297078988180130L;
3087
3088    @Override
3089    public synchronized Throwable fillInStackTrace() {
3090      return this;
3091    }
3092  };
3093
3094  private RegionScannerHolder getRegionScanner(ScanRequest request) throws IOException {
3095    String scannerName = toScannerName(request.getScannerId());
3096    RegionScannerHolder rsh = this.scanners.get(scannerName);
3097    if (rsh == null) {
3098      // just ignore the next or close request if scanner does not exists.
3099      if (closedScanners.getIfPresent(scannerName) != null) {
3100        throw SCANNER_ALREADY_CLOSED;
3101      } else {
3102        LOG.warn("Client tried to access missing scanner " + scannerName);
3103        throw new UnknownScannerException(
3104          "Unknown scanner '" + scannerName + "'. This can happen due to any of the following "
3105            + "reasons: a) Scanner id given is wrong, b) Scanner lease expired because of "
3106            + "long wait between consecutive client checkins, c) Server may be closing down, "
3107            + "d) RegionServer restart during upgrade.\nIf the issue is due to reason (b), a "
3108            + "possible fix would be increasing the value of"
3109            + "'hbase.client.scanner.timeout.period' configuration.");
3110      }
3111    }
3112    rejectIfInStandByState(rsh.r);
3113    RegionInfo hri = rsh.s.getRegionInfo();
3114    // Yes, should be the same instance
3115    if (server.getOnlineRegion(hri.getRegionName()) != rsh.r) {
3116      String msg = "Region has changed on the scanner " + scannerName + ": regionName="
3117        + hri.getRegionNameAsString() + ", scannerRegionName=" + rsh.r;
3118      LOG.warn(msg + ", closing...");
3119      scanners.remove(scannerName);
3120      try {
3121        rsh.s.close();
3122      } catch (IOException e) {
3123        LOG.warn("Getting exception closing " + scannerName, e);
3124      } finally {
3125        try {
3126          server.getLeaseManager().cancelLease(scannerName);
3127        } catch (LeaseException e) {
3128          LOG.warn("Getting exception closing " + scannerName, e);
3129        }
3130      }
3131      throw new NotServingRegionException(msg);
3132    }
3133    return rsh;
3134  }
3135
3136  /**
3137   * @return Pair with scannerName key to use with this new Scanner and its RegionScannerHolder
3138   *         value.
3139   */
3140  private Pair<String, RegionScannerHolder> newRegionScanner(ScanRequest request,
3141    ScanResponse.Builder builder) throws IOException {
3142    HRegion region = getRegion(request.getRegion());
3143    rejectIfInStandByState(region);
3144    ClientProtos.Scan protoScan = request.getScan();
3145    boolean isLoadingCfsOnDemandSet = protoScan.hasLoadColumnFamiliesOnDemand();
3146    Scan scan = ProtobufUtil.toScan(protoScan);
3147    // if the request doesn't set this, get the default region setting.
3148    if (!isLoadingCfsOnDemandSet) {
3149      scan.setLoadColumnFamiliesOnDemand(region.isLoadingCfsOnDemandDefault());
3150    }
3151
3152    if (!scan.hasFamilies()) {
3153      // Adding all families to scanner
3154      for (byte[] family : region.getTableDescriptor().getColumnFamilyNames()) {
3155        scan.addFamily(family);
3156      }
3157    }
3158    if (region.getCoprocessorHost() != null) {
3159      // preScannerOpen is not allowed to return a RegionScanner. Only post hook can create a
3160      // wrapper for the core created RegionScanner
3161      region.getCoprocessorHost().preScannerOpen(scan);
3162    }
3163    RegionScannerImpl coreScanner = region.getScanner(scan);
3164    Shipper shipper = coreScanner;
3165    RegionScanner scanner = coreScanner;
3166    try {
3167      if (region.getCoprocessorHost() != null) {
3168        scanner = region.getCoprocessorHost().postScannerOpen(scan, scanner);
3169      }
3170    } catch (Exception e) {
3171      // Although region coprocessor is for advanced users and they should take care of the
3172      // implementation to not damage the HBase system, closing the scanner on exception here does
3173      // not have any bad side effect, so let's do it
3174      scanner.close();
3175      throw e;
3176    }
3177    long scannerId = scannerIdGenerator.generateNewScannerId();
3178    builder.setScannerId(scannerId);
3179    builder.setMvccReadPoint(scanner.getMvccReadPoint());
3180    builder.setTtl(scannerLeaseTimeoutPeriod);
3181    String scannerName = toScannerName(scannerId);
3182
3183    boolean fullRegionScan =
3184      !region.getRegionInfo().getTable().isSystemTable() && isFullRegionScan(scan, region);
3185
3186    return new Pair<String, RegionScannerHolder>(scannerName,
3187      addScanner(scannerName, scanner, shipper, region, scan.isNeedCursorResult(), fullRegionScan));
3188  }
3189
3190  /**
3191   * The returned String is used as key doing look up of outstanding Scanners in this Servers'
3192   * this.scanners, the Map of outstanding scanners and their current state.
3193   * @param scannerId A scanner long id.
3194   * @return The long id as a String.
3195   */
3196  private static String toScannerName(long scannerId) {
3197    return Long.toString(scannerId);
3198  }
3199
3200  private void checkScanNextCallSeq(ScanRequest request, RegionScannerHolder rsh)
3201    throws OutOfOrderScannerNextException {
3202    // if nextCallSeq does not match throw Exception straight away. This needs to be
3203    // performed even before checking of Lease.
3204    // See HBASE-5974
3205    if (request.hasNextCallSeq()) {
3206      long callSeq = request.getNextCallSeq();
3207      if (!rsh.incNextCallSeq(callSeq)) {
3208        throw new OutOfOrderScannerNextException(
3209          "Expected nextCallSeq: " + rsh.getNextCallSeq() + " But the nextCallSeq got from client: "
3210            + request.getNextCallSeq() + "; request=" + TextFormat.shortDebugString(request));
3211      }
3212    }
3213  }
3214
3215  private void addScannerLeaseBack(LeaseManager.Lease lease) {
3216    try {
3217      server.getLeaseManager().addLease(lease);
3218    } catch (LeaseStillHeldException e) {
3219      // should not happen as the scanner id is unique.
3220      throw new AssertionError(e);
3221    }
3222  }
3223
3224  // visible for testing only
3225  long getTimeLimit(RpcCall rpcCall, HBaseRpcController controller,
3226    boolean allowHeartbeatMessages) {
3227    // Set the time limit to be half of the more restrictive timeout value (one of the
3228    // timeout values must be positive). In the event that both values are positive, the
3229    // more restrictive of the two is used to calculate the limit.
3230    if (allowHeartbeatMessages) {
3231      long now = EnvironmentEdgeManager.currentTime();
3232      long remainingTimeout = getRemainingRpcTimeout(rpcCall, controller, now);
3233      if (scannerLeaseTimeoutPeriod > 0 || remainingTimeout > 0) {
3234        long timeLimitDelta;
3235        if (scannerLeaseTimeoutPeriod > 0 && remainingTimeout > 0) {
3236          timeLimitDelta = Math.min(scannerLeaseTimeoutPeriod, remainingTimeout);
3237        } else {
3238          timeLimitDelta =
3239            scannerLeaseTimeoutPeriod > 0 ? scannerLeaseTimeoutPeriod : remainingTimeout;
3240        }
3241
3242        // Use half of whichever timeout value was more restrictive... But don't allow
3243        // the time limit to be less than the allowable minimum (could cause an
3244        // immediate timeout before scanning any data).
3245        timeLimitDelta = Math.max(timeLimitDelta / 2, minimumScanTimeLimitDelta);
3246        return now + timeLimitDelta;
3247      }
3248    }
3249    // Default value of timeLimit is negative to indicate no timeLimit should be
3250    // enforced.
3251    return -1L;
3252  }
3253
3254  private long getRemainingRpcTimeout(RpcCall call, HBaseRpcController controller, long now) {
3255    long timeout;
3256    if (controller != null && controller.getCallTimeout() > 0) {
3257      timeout = controller.getCallTimeout();
3258    } else if (rpcTimeout > 0) {
3259      timeout = rpcTimeout;
3260    } else {
3261      return -1;
3262    }
3263    if (call != null) {
3264      timeout -= (now - call.getReceiveTime());
3265    }
3266    // getTimeLimit ignores values <= 0, but timeout may now be negative if queue time was high.
3267    // return minimum value here in that case so we count this in calculating the final delta.
3268    return Math.max(minimumScanTimeLimitDelta, timeout);
3269  }
3270
3271  private void checkLimitOfRows(int numOfCompleteRows, int limitOfRows, boolean moreRows,
3272    ScannerContext scannerContext, ScanResponse.Builder builder) {
3273    if (numOfCompleteRows >= limitOfRows) {
3274      if (LOG.isTraceEnabled()) {
3275        LOG.trace("Done scanning, limit of rows reached, moreRows: " + moreRows
3276          + " scannerContext: " + scannerContext);
3277      }
3278      builder.setMoreResults(false);
3279    }
3280  }
3281
3282  // return whether we have more results in region.
3283  private void scan(HBaseRpcController controller, ScanRequest request, RegionScannerHolder rsh,
3284    long maxQuotaResultSize, int maxResults, int limitOfRows, List<Result> results,
3285    ScanResponse.Builder builder, MutableObject<Object> lastBlock, RpcCall rpcCall)
3286    throws IOException {
3287    HRegion region = rsh.r;
3288    RegionScanner scanner = rsh.s;
3289    long maxResultSize;
3290    if (scanner.getMaxResultSize() > 0) {
3291      maxResultSize = Math.min(scanner.getMaxResultSize(), maxQuotaResultSize);
3292    } else {
3293      maxResultSize = maxQuotaResultSize;
3294    }
3295    // This is cells inside a row. Default size is 10 so if many versions or many cfs,
3296    // then we'll resize. Resizings show in profiler. Set it higher than 10. For now
3297    // arbitrary 32. TODO: keep record of general size of results being returned.
3298    ArrayList<Cell> values = new ArrayList<>(32);
3299    region.startRegionOperation(Operation.SCAN);
3300    long before = EnvironmentEdgeManager.currentTime();
3301    // Used to check if we've matched the row limit set on the Scan
3302    int numOfCompleteRows = 0;
3303    // Count of times we call nextRaw; can be > numOfCompleteRows.
3304    int numOfNextRawCalls = 0;
3305    try {
3306      int numOfResults = 0;
3307      synchronized (scanner) {
3308        boolean stale = (region.getRegionInfo().getReplicaId() != 0);
3309        boolean clientHandlesPartials =
3310          request.hasClientHandlesPartials() && request.getClientHandlesPartials();
3311        boolean clientHandlesHeartbeats =
3312          request.hasClientHandlesHeartbeats() && request.getClientHandlesHeartbeats();
3313
3314        // On the server side we must ensure that the correct ordering of partial results is
3315        // returned to the client to allow them to properly reconstruct the partial results.
3316        // If the coprocessor host is adding to the result list, we cannot guarantee the
3317        // correct ordering of partial results and so we prevent partial results from being
3318        // formed.
3319        boolean serverGuaranteesOrderOfPartials = results.isEmpty();
3320        boolean allowPartialResults = clientHandlesPartials && serverGuaranteesOrderOfPartials;
3321        boolean moreRows = false;
3322
3323        // Heartbeat messages occur when the processing of the ScanRequest is exceeds a
3324        // certain time threshold on the server. When the time threshold is exceeded, the
3325        // server stops the scan and sends back whatever Results it has accumulated within
3326        // that time period (may be empty). Since heartbeat messages have the potential to
3327        // create partial Results (in the event that the timeout occurs in the middle of a
3328        // row), we must only generate heartbeat messages when the client can handle both
3329        // heartbeats AND partials
3330        boolean allowHeartbeatMessages = clientHandlesHeartbeats && allowPartialResults;
3331
3332        long timeLimit = getTimeLimit(rpcCall, controller, allowHeartbeatMessages);
3333
3334        final LimitScope sizeScope =
3335          allowPartialResults ? LimitScope.BETWEEN_CELLS : LimitScope.BETWEEN_ROWS;
3336        final LimitScope timeScope =
3337          allowHeartbeatMessages ? LimitScope.BETWEEN_CELLS : LimitScope.BETWEEN_ROWS;
3338
3339        boolean trackMetrics = request.hasTrackScanMetrics() && request.getTrackScanMetrics();
3340
3341        // Configure with limits for this RPC. Set keep progress true since size progress
3342        // towards size limit should be kept between calls to nextRaw
3343        ScannerContext.Builder contextBuilder = ScannerContext.newBuilder(true);
3344        // maxResultSize - either we can reach this much size for all cells(being read) data or sum
3345        // of heap size occupied by cells(being read). Cell data means its key and value parts.
3346        contextBuilder.setSizeLimit(sizeScope, maxResultSize, maxResultSize);
3347        contextBuilder.setBatchLimit(scanner.getBatch());
3348        contextBuilder.setTimeLimit(timeScope, timeLimit);
3349        contextBuilder.setTrackMetrics(trackMetrics);
3350        ScannerContext scannerContext = contextBuilder.build();
3351        boolean limitReached = false;
3352        while (numOfResults < maxResults) {
3353          // Reset the batch progress to 0 before every call to RegionScanner#nextRaw. The
3354          // batch limit is a limit on the number of cells per Result. Thus, if progress is
3355          // being tracked (i.e. scannerContext.keepProgress() is true) then we need to
3356          // reset the batch progress between nextRaw invocations since we don't want the
3357          // batch progress from previous calls to affect future calls
3358          scannerContext.setBatchProgress(0);
3359          assert values.isEmpty();
3360
3361          // Collect values to be returned here
3362          moreRows = scanner.nextRaw(values, scannerContext);
3363          if (rpcCall == null) {
3364            // When there is no RpcCallContext,copy EC to heap, then the scanner would close,
3365            // This can be an EXPENSIVE call. It may make an extra copy from offheap to onheap
3366            // buffers.See more details in HBASE-26036.
3367            CellUtil.cloneIfNecessary(values);
3368          }
3369          numOfNextRawCalls++;
3370
3371          if (!values.isEmpty()) {
3372            if (limitOfRows > 0) {
3373              // First we need to check if the last result is partial and we have a row change. If
3374              // so then we need to increase the numOfCompleteRows.
3375              if (results.isEmpty()) {
3376                if (
3377                  rsh.rowOfLastPartialResult != null
3378                    && !CellUtil.matchingRows(values.get(0), rsh.rowOfLastPartialResult)
3379                ) {
3380                  numOfCompleteRows++;
3381                  checkLimitOfRows(numOfCompleteRows, limitOfRows, moreRows, scannerContext,
3382                    builder);
3383                }
3384              } else {
3385                Result lastResult = results.get(results.size() - 1);
3386                if (
3387                  lastResult.mayHaveMoreCellsInRow()
3388                    && !CellUtil.matchingRows(values.get(0), lastResult.getRow())
3389                ) {
3390                  numOfCompleteRows++;
3391                  checkLimitOfRows(numOfCompleteRows, limitOfRows, moreRows, scannerContext,
3392                    builder);
3393                }
3394              }
3395              if (builder.hasMoreResults() && !builder.getMoreResults()) {
3396                break;
3397              }
3398            }
3399            boolean mayHaveMoreCellsInRow = scannerContext.mayHaveMoreCellsInRow();
3400            Result r = Result.create(values, null, stale, mayHaveMoreCellsInRow);
3401            lastBlock.setValue(addSize(rpcCall, r, lastBlock.getValue()));
3402            results.add(r);
3403            numOfResults++;
3404            if (!mayHaveMoreCellsInRow && limitOfRows > 0) {
3405              numOfCompleteRows++;
3406              checkLimitOfRows(numOfCompleteRows, limitOfRows, moreRows, scannerContext, builder);
3407              if (builder.hasMoreResults() && !builder.getMoreResults()) {
3408                break;
3409              }
3410            }
3411          } else if (!moreRows && !results.isEmpty()) {
3412            // No more cells for the scan here, we need to ensure that the mayHaveMoreCellsInRow of
3413            // last result is false. Otherwise it's possible that: the first nextRaw returned
3414            // because BATCH_LIMIT_REACHED (BTW it happen to exhaust all cells of the scan),so the
3415            // last result's mayHaveMoreCellsInRow will be true. while the following nextRaw will
3416            // return with moreRows=false, which means moreResultsInRegion would be false, it will
3417            // be a contradictory state (HBASE-21206).
3418            int lastIdx = results.size() - 1;
3419            Result r = results.get(lastIdx);
3420            if (r.mayHaveMoreCellsInRow()) {
3421              results.set(lastIdx, Result.create(r.rawCells(), r.getExists(), r.isStale(), false));
3422            }
3423          }
3424          boolean sizeLimitReached = scannerContext.checkSizeLimit(LimitScope.BETWEEN_ROWS);
3425          boolean timeLimitReached = scannerContext.checkTimeLimit(LimitScope.BETWEEN_ROWS);
3426          boolean resultsLimitReached = numOfResults >= maxResults;
3427          limitReached = sizeLimitReached || timeLimitReached || resultsLimitReached;
3428
3429          if (limitReached || !moreRows) {
3430            // We only want to mark a ScanResponse as a heartbeat message in the event that
3431            // there are more values to be read server side. If there aren't more values,
3432            // marking it as a heartbeat is wasteful because the client will need to issue
3433            // another ScanRequest only to realize that they already have all the values
3434            if (moreRows && timeLimitReached) {
3435              // Heartbeat messages occur when the time limit has been reached.
3436              builder.setHeartbeatMessage(true);
3437              if (rsh.needCursor) {
3438                Cell cursorCell = scannerContext.getLastPeekedCell();
3439                if (cursorCell != null) {
3440                  builder.setCursor(ProtobufUtil.toCursor(cursorCell));
3441                }
3442              }
3443            }
3444            break;
3445          }
3446          values.clear();
3447        }
3448        builder.setMoreResultsInRegion(moreRows);
3449        // Check to see if the client requested that we track metrics server side. If the
3450        // client requested metrics, retrieve the metrics from the scanner context.
3451        if (trackMetrics) {
3452          Map<String, Long> metrics = scannerContext.getMetrics().getMetricsMap();
3453          ScanMetrics.Builder metricBuilder = ScanMetrics.newBuilder();
3454          NameInt64Pair.Builder pairBuilder = NameInt64Pair.newBuilder();
3455
3456          for (Entry<String, Long> entry : metrics.entrySet()) {
3457            pairBuilder.setName(entry.getKey());
3458            pairBuilder.setValue(entry.getValue());
3459            metricBuilder.addMetrics(pairBuilder.build());
3460          }
3461
3462          builder.setScanMetrics(metricBuilder.build());
3463        }
3464      }
3465    } finally {
3466      region.closeRegionOperation();
3467      // Update serverside metrics, even on error.
3468      long end = EnvironmentEdgeManager.currentTime();
3469      long responseCellSize = rpcCall != null ? rpcCall.getResponseCellSize() : 0;
3470      region.getMetrics().updateScanTime(end - before);
3471      final MetricsRegionServer metricsRegionServer = server.getMetrics();
3472      if (metricsRegionServer != null) {
3473        metricsRegionServer.updateScanSize(region.getTableDescriptor().getTableName(),
3474          responseCellSize);
3475        metricsRegionServer.updateScanTime(region.getTableDescriptor().getTableName(),
3476          end - before);
3477        metricsRegionServer.updateReadQueryMeter(region.getRegionInfo().getTable(),
3478          numOfNextRawCalls);
3479      }
3480    }
3481    // coprocessor postNext hook
3482    if (region.getCoprocessorHost() != null) {
3483      region.getCoprocessorHost().postScannerNext(scanner, results, maxResults, true);
3484    }
3485  }
3486
3487  /**
3488   * Scan data in a table.
3489   * @param controller the RPC controller
3490   * @param request    the scan request n
3491   */
3492  @Override
3493  public ScanResponse scan(final RpcController controller, final ScanRequest request)
3494    throws ServiceException {
3495    if (controller != null && !(controller instanceof HBaseRpcController)) {
3496      throw new UnsupportedOperationException(
3497        "We only do " + "HBaseRpcControllers! FIX IF A PROBLEM: " + controller);
3498    }
3499    if (!request.hasScannerId() && !request.hasScan()) {
3500      throw new ServiceException(
3501        new DoNotRetryIOException("Missing required input: scannerId or scan"));
3502    }
3503    try {
3504      checkOpen();
3505    } catch (IOException e) {
3506      if (request.hasScannerId()) {
3507        String scannerName = toScannerName(request.getScannerId());
3508        if (LOG.isDebugEnabled()) {
3509          LOG.debug(
3510            "Server shutting down and client tried to access missing scanner " + scannerName);
3511        }
3512        final LeaseManager leaseManager = server.getLeaseManager();
3513        if (leaseManager != null) {
3514          try {
3515            leaseManager.cancelLease(scannerName);
3516          } catch (LeaseException le) {
3517            // No problem, ignore
3518            if (LOG.isTraceEnabled()) {
3519              LOG.trace("Un-able to cancel lease of scanner. It could already be closed.");
3520            }
3521          }
3522        }
3523      }
3524      throw new ServiceException(e);
3525    }
3526    requestCount.increment();
3527    rpcScanRequestCount.increment();
3528    RegionScannerHolder rsh;
3529    ScanResponse.Builder builder = ScanResponse.newBuilder();
3530    String scannerName;
3531    try {
3532      if (request.hasScannerId()) {
3533        // The downstream projects such as AsyncHBase in OpenTSDB need this value. See HBASE-18000
3534        // for more details.
3535        long scannerId = request.getScannerId();
3536        builder.setScannerId(scannerId);
3537        scannerName = toScannerName(scannerId);
3538        rsh = getRegionScanner(request);
3539      } else {
3540        Pair<String, RegionScannerHolder> scannerNameAndRSH = newRegionScanner(request, builder);
3541        scannerName = scannerNameAndRSH.getFirst();
3542        rsh = scannerNameAndRSH.getSecond();
3543      }
3544    } catch (IOException e) {
3545      if (e == SCANNER_ALREADY_CLOSED) {
3546        // Now we will close scanner automatically if there are no more results for this region but
3547        // the old client will still send a close request to us. Just ignore it and return.
3548        return builder.build();
3549      }
3550      throw new ServiceException(e);
3551    }
3552    if (rsh.fullRegionScan) {
3553      rpcFullScanRequestCount.increment();
3554    }
3555    HRegion region = rsh.r;
3556    LeaseManager.Lease lease;
3557    try {
3558      // Remove lease while its being processed in server; protects against case
3559      // where processing of request takes > lease expiration time. or null if none found.
3560      lease = server.getLeaseManager().removeLease(scannerName);
3561    } catch (LeaseException e) {
3562      throw new ServiceException(e);
3563    }
3564    if (request.hasRenew() && request.getRenew()) {
3565      // add back and return
3566      addScannerLeaseBack(lease);
3567      try {
3568        checkScanNextCallSeq(request, rsh);
3569      } catch (OutOfOrderScannerNextException e) {
3570        throw new ServiceException(e);
3571      }
3572      return builder.build();
3573    }
3574    OperationQuota quota;
3575    try {
3576      quota = getRpcQuotaManager().checkQuota(region, OperationQuota.OperationType.SCAN);
3577    } catch (IOException e) {
3578      addScannerLeaseBack(lease);
3579      throw new ServiceException(e);
3580    }
3581    try {
3582      checkScanNextCallSeq(request, rsh);
3583    } catch (OutOfOrderScannerNextException e) {
3584      addScannerLeaseBack(lease);
3585      throw new ServiceException(e);
3586    }
3587    // Now we have increased the next call sequence. If we give client an error, the retry will
3588    // never success. So we'd better close the scanner and return a DoNotRetryIOException to client
3589    // and then client will try to open a new scanner.
3590    boolean closeScanner = request.hasCloseScanner() ? request.getCloseScanner() : false;
3591    int rows; // this is scan.getCaching
3592    if (request.hasNumberOfRows()) {
3593      rows = request.getNumberOfRows();
3594    } else {
3595      rows = closeScanner ? 0 : 1;
3596    }
3597    RpcCall rpcCall = RpcServer.getCurrentCall().orElse(null);
3598    // now let's do the real scan.
3599    long maxQuotaResultSize = Math.min(maxScannerResultSize, quota.getReadAvailable());
3600    RegionScanner scanner = rsh.s;
3601    // this is the limit of rows for this scan, if we the number of rows reach this value, we will
3602    // close the scanner.
3603    int limitOfRows;
3604    if (request.hasLimitOfRows()) {
3605      limitOfRows = request.getLimitOfRows();
3606    } else {
3607      limitOfRows = -1;
3608    }
3609    MutableObject<Object> lastBlock = new MutableObject<>();
3610    boolean scannerClosed = false;
3611    try {
3612      List<Result> results = new ArrayList<>(Math.min(rows, 512));
3613      if (rows > 0) {
3614        boolean done = false;
3615        // Call coprocessor. Get region info from scanner.
3616        if (region.getCoprocessorHost() != null) {
3617          Boolean bypass = region.getCoprocessorHost().preScannerNext(scanner, results, rows);
3618          if (!results.isEmpty()) {
3619            for (Result r : results) {
3620              lastBlock.setValue(addSize(rpcCall, r, lastBlock.getValue()));
3621            }
3622          }
3623          if (bypass != null && bypass.booleanValue()) {
3624            done = true;
3625          }
3626        }
3627        if (!done) {
3628          scan((HBaseRpcController) controller, request, rsh, maxQuotaResultSize, rows, limitOfRows,
3629            results, builder, lastBlock, rpcCall);
3630        } else {
3631          builder.setMoreResultsInRegion(!results.isEmpty());
3632        }
3633      } else {
3634        // This is a open scanner call with numberOfRow = 0, so set more results in region to true.
3635        builder.setMoreResultsInRegion(true);
3636      }
3637
3638      quota.addScanResult(results);
3639      addResults(builder, results, (HBaseRpcController) controller,
3640        RegionReplicaUtil.isDefaultReplica(region.getRegionInfo()),
3641        isClientCellBlockSupport(rpcCall));
3642      if (scanner.isFilterDone() && results.isEmpty()) {
3643        // If the scanner's filter - if any - is done with the scan
3644        // only set moreResults to false if the results is empty. This is used to keep compatible
3645        // with the old scan implementation where we just ignore the returned results if moreResults
3646        // is false. Can remove the isEmpty check after we get rid of the old implementation.
3647        builder.setMoreResults(false);
3648      }
3649      // Later we may close the scanner depending on this flag so here we need to make sure that we
3650      // have already set this flag.
3651      assert builder.hasMoreResultsInRegion();
3652      // we only set moreResults to false in the above code, so set it to true if we haven't set it
3653      // yet.
3654      if (!builder.hasMoreResults()) {
3655        builder.setMoreResults(true);
3656      }
3657      if (builder.getMoreResults() && builder.getMoreResultsInRegion() && !results.isEmpty()) {
3658        // Record the last cell of the last result if it is a partial result
3659        // We need this to calculate the complete rows we have returned to client as the
3660        // mayHaveMoreCellsInRow is true does not mean that there will be extra cells for the
3661        // current row. We may filter out all the remaining cells for the current row and just
3662        // return the cells of the nextRow when calling RegionScanner.nextRaw. So here we need to
3663        // check for row change.
3664        Result lastResult = results.get(results.size() - 1);
3665        if (lastResult.mayHaveMoreCellsInRow()) {
3666          rsh.rowOfLastPartialResult = lastResult.getRow();
3667        } else {
3668          rsh.rowOfLastPartialResult = null;
3669        }
3670      }
3671      if (!builder.getMoreResults() || !builder.getMoreResultsInRegion() || closeScanner) {
3672        scannerClosed = true;
3673        closeScanner(region, scanner, scannerName, rpcCall);
3674      }
3675
3676      // There's no point returning to a timed out client. Throwing ensures scanner is closed
3677      if (rpcCall != null && EnvironmentEdgeManager.currentTime() > rpcCall.getDeadline()) {
3678        throw new TimeoutIOException("Client deadline exceeded, cannot return results");
3679      }
3680
3681      return builder.build();
3682    } catch (IOException e) {
3683      try {
3684        // scanner is closed here
3685        scannerClosed = true;
3686        // The scanner state might be left in a dirty state, so we will tell the Client to
3687        // fail this RPC and close the scanner while opening up another one from the start of
3688        // row that the client has last seen.
3689        closeScanner(region, scanner, scannerName, rpcCall);
3690
3691        // If it is a DoNotRetryIOException already, throw as it is. Unfortunately, DNRIOE is
3692        // used in two different semantics.
3693        // (1) The first is to close the client scanner and bubble up the exception all the way
3694        // to the application. This is preferred when the exception is really un-recoverable
3695        // (like CorruptHFileException, etc). Plain DoNotRetryIOException also falls into this
3696        // bucket usually.
3697        // (2) Second semantics is to close the current region scanner only, but continue the
3698        // client scanner by overriding the exception. This is usually UnknownScannerException,
3699        // OutOfOrderScannerNextException, etc where the region scanner has to be closed, but the
3700        // application-level ClientScanner has to continue without bubbling up the exception to
3701        // the client. See ClientScanner code to see how it deals with these special exceptions.
3702        if (e instanceof DoNotRetryIOException) {
3703          throw e;
3704        }
3705
3706        // If it is a FileNotFoundException, wrap as a
3707        // DoNotRetryIOException. This can avoid the retry in ClientScanner.
3708        if (e instanceof FileNotFoundException) {
3709          throw new DoNotRetryIOException(e);
3710        }
3711
3712        // We closed the scanner already. Instead of throwing the IOException, and client
3713        // retrying with the same scannerId only to get USE on the next RPC, we directly throw
3714        // a special exception to save an RPC.
3715        if (VersionInfoUtil.hasMinimumVersion(rpcCall.getClientVersionInfo(), 1, 4)) {
3716          // 1.4.0+ clients know how to handle
3717          throw new ScannerResetException("Scanner is closed on the server-side", e);
3718        } else {
3719          // older clients do not know about SRE. Just throw USE, which they will handle
3720          throw new UnknownScannerException("Throwing UnknownScannerException to reset the client"
3721            + " scanner state for clients older than 1.3.", e);
3722        }
3723      } catch (IOException ioe) {
3724        throw new ServiceException(ioe);
3725      }
3726    } finally {
3727      if (!scannerClosed) {
3728        // Adding resets expiration time on lease.
3729        // the closeCallBack will be set in closeScanner so here we only care about shippedCallback
3730        if (rpcCall != null) {
3731          rpcCall.setCallBack(rsh.shippedCallback);
3732        } else {
3733          // If context is null,here we call rsh.shippedCallback directly to reuse the logic in
3734          // rsh.shippedCallback to release the internal resources in rsh,and lease is also added
3735          // back to regionserver's LeaseManager in rsh.shippedCallback.
3736          runShippedCallback(rsh);
3737        }
3738      }
3739      quota.close();
3740    }
3741  }
3742
3743  private void runShippedCallback(RegionScannerHolder rsh) throws ServiceException {
3744    assert rsh.shippedCallback != null;
3745    try {
3746      rsh.shippedCallback.run();
3747    } catch (IOException ioe) {
3748      throw new ServiceException(ioe);
3749    }
3750  }
3751
3752  private void closeScanner(HRegion region, RegionScanner scanner, String scannerName,
3753    RpcCallContext context) throws IOException {
3754    if (region.getCoprocessorHost() != null) {
3755      if (region.getCoprocessorHost().preScannerClose(scanner)) {
3756        // bypass the actual close.
3757        return;
3758      }
3759    }
3760    RegionScannerHolder rsh = scanners.remove(scannerName);
3761    if (rsh != null) {
3762      if (context != null) {
3763        context.setCallBack(rsh.closeCallBack);
3764      } else {
3765        rsh.s.close();
3766      }
3767      if (region.getCoprocessorHost() != null) {
3768        region.getCoprocessorHost().postScannerClose(scanner);
3769      }
3770      closedScanners.put(scannerName, scannerName);
3771    }
3772  }
3773
3774  @Override
3775  public CoprocessorServiceResponse execRegionServerService(RpcController controller,
3776    CoprocessorServiceRequest request) throws ServiceException {
3777    rpcPreCheck("execRegionServerService");
3778    return server.execRegionServerService(controller, request);
3779  }
3780
3781  @Override
3782  public GetSpaceQuotaSnapshotsResponse getSpaceQuotaSnapshots(RpcController controller,
3783    GetSpaceQuotaSnapshotsRequest request) throws ServiceException {
3784    try {
3785      final RegionServerSpaceQuotaManager manager = server.getRegionServerSpaceQuotaManager();
3786      final GetSpaceQuotaSnapshotsResponse.Builder builder =
3787        GetSpaceQuotaSnapshotsResponse.newBuilder();
3788      if (manager != null) {
3789        final Map<TableName, SpaceQuotaSnapshot> snapshots = manager.copyQuotaSnapshots();
3790        for (Entry<TableName, SpaceQuotaSnapshot> snapshot : snapshots.entrySet()) {
3791          builder.addSnapshots(TableQuotaSnapshot.newBuilder()
3792            .setTableName(ProtobufUtil.toProtoTableName(snapshot.getKey()))
3793            .setSnapshot(SpaceQuotaSnapshot.toProtoSnapshot(snapshot.getValue())).build());
3794        }
3795      }
3796      return builder.build();
3797    } catch (Exception e) {
3798      throw new ServiceException(e);
3799    }
3800  }
3801
3802  @Override
3803  public ClearRegionBlockCacheResponse clearRegionBlockCache(RpcController controller,
3804    ClearRegionBlockCacheRequest request) throws ServiceException {
3805    rpcPreCheck("clearRegionBlockCache");
3806    ClearRegionBlockCacheResponse.Builder builder = ClearRegionBlockCacheResponse.newBuilder();
3807    CacheEvictionStatsBuilder stats = CacheEvictionStats.builder();
3808    List<HRegion> regions = getRegions(request.getRegionList(), stats);
3809    for (HRegion region : regions) {
3810      try {
3811        stats = stats.append(this.server.clearRegionBlockCache(region));
3812      } catch (Exception e) {
3813        stats.addException(region.getRegionInfo().getRegionName(), e);
3814      }
3815    }
3816    stats.withMaxCacheSize(server.getBlockCache().map(BlockCache::getMaxSize).orElse(0L));
3817    return builder.setStats(ProtobufUtil.toCacheEvictionStats(stats.build())).build();
3818  }
3819
3820  private void executeOpenRegionProcedures(OpenRegionRequest request,
3821    Map<TableName, TableDescriptor> tdCache) {
3822    long masterSystemTime = request.hasMasterSystemTime() ? request.getMasterSystemTime() : -1;
3823    for (RegionOpenInfo regionOpenInfo : request.getOpenInfoList()) {
3824      RegionInfo regionInfo = ProtobufUtil.toRegionInfo(regionOpenInfo.getRegion());
3825      TableName tableName = regionInfo.getTable();
3826      TableDescriptor tableDesc = tdCache.get(tableName);
3827      if (tableDesc == null) {
3828        try {
3829          tableDesc = server.getTableDescriptors().get(regionInfo.getTable());
3830        } catch (IOException e) {
3831          // Here we do not fail the whole method since we also need deal with other
3832          // procedures, and we can not ignore this one, so we still schedule a
3833          // AssignRegionHandler and it will report back to master if we still can not get the
3834          // TableDescriptor.
3835          LOG.warn("Failed to get TableDescriptor of {}, will try again in the handler",
3836            regionInfo.getTable(), e);
3837        }
3838        if (tableDesc != null) {
3839          tdCache.put(tableName, tableDesc);
3840        }
3841      }
3842      if (regionOpenInfo.getFavoredNodesCount() > 0) {
3843        server.updateRegionFavoredNodesMapping(regionInfo.getEncodedName(),
3844          regionOpenInfo.getFavoredNodesList());
3845      }
3846      long procId = regionOpenInfo.getOpenProcId();
3847      if (server.submitRegionProcedure(procId)) {
3848        server.getExecutorService().submit(
3849          AssignRegionHandler.create(server, regionInfo, procId, tableDesc, masterSystemTime));
3850      }
3851    }
3852  }
3853
3854  private void executeCloseRegionProcedures(CloseRegionRequest request) {
3855    String encodedName;
3856    try {
3857      encodedName = ProtobufUtil.getRegionEncodedName(request.getRegion());
3858    } catch (DoNotRetryIOException e) {
3859      throw new UncheckedIOException("Should not happen", e);
3860    }
3861    ServerName destination = request.hasDestinationServer()
3862      ? ProtobufUtil.toServerName(request.getDestinationServer())
3863      : null;
3864    long procId = request.getCloseProcId();
3865    if (server.submitRegionProcedure(procId)) {
3866      server.getExecutorService()
3867        .submit(UnassignRegionHandler.create(server, encodedName, procId, false, destination));
3868    }
3869  }
3870
3871  private void executeProcedures(RemoteProcedureRequest request) {
3872    RSProcedureCallable callable;
3873    try {
3874      callable = Class.forName(request.getProcClass()).asSubclass(RSProcedureCallable.class)
3875        .getDeclaredConstructor().newInstance();
3876    } catch (Exception e) {
3877      LOG.warn("Failed to instantiating remote procedure {}, pid={}", request.getProcClass(),
3878        request.getProcId(), e);
3879      server.remoteProcedureComplete(request.getProcId(), e);
3880      return;
3881    }
3882    callable.init(request.getProcData().toByteArray(), server);
3883    LOG.debug("Executing remote procedure {}, pid={}", callable.getClass(), request.getProcId());
3884    server.executeProcedure(request.getProcId(), callable);
3885  }
3886
3887  @Override
3888  @QosPriority(priority = HConstants.ADMIN_QOS)
3889  public ExecuteProceduresResponse executeProcedures(RpcController controller,
3890    ExecuteProceduresRequest request) throws ServiceException {
3891    try {
3892      checkOpen();
3893      throwOnWrongStartCode(request);
3894      server.getRegionServerCoprocessorHost().preExecuteProcedures();
3895      if (request.getOpenRegionCount() > 0) {
3896        // Avoid reading from the TableDescritor every time(usually it will read from the file
3897        // system)
3898        Map<TableName, TableDescriptor> tdCache = new HashMap<>();
3899        request.getOpenRegionList().forEach(req -> executeOpenRegionProcedures(req, tdCache));
3900      }
3901      if (request.getCloseRegionCount() > 0) {
3902        request.getCloseRegionList().forEach(this::executeCloseRegionProcedures);
3903      }
3904      if (request.getProcCount() > 0) {
3905        request.getProcList().forEach(this::executeProcedures);
3906      }
3907      server.getRegionServerCoprocessorHost().postExecuteProcedures();
3908      return ExecuteProceduresResponse.getDefaultInstance();
3909    } catch (IOException e) {
3910      throw new ServiceException(e);
3911    }
3912  }
3913
3914  @Override
3915  public GetAllBootstrapNodesResponse getAllBootstrapNodes(RpcController controller,
3916    GetAllBootstrapNodesRequest request) throws ServiceException {
3917    GetAllBootstrapNodesResponse.Builder builder = GetAllBootstrapNodesResponse.newBuilder();
3918    server.getBootstrapNodes()
3919      .forEachRemaining(server -> builder.addNode(ProtobufUtil.toServerName(server)));
3920    return builder.build();
3921  }
3922}