001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import java.io.FileNotFoundException;
021import java.io.IOException;
022import java.io.InterruptedIOException;
023import java.io.UncheckedIOException;
024import java.lang.reflect.InvocationTargetException;
025import java.net.BindException;
026import java.net.InetSocketAddress;
027import java.net.UnknownHostException;
028import java.nio.ByteBuffer;
029import java.util.ArrayList;
030import java.util.Arrays;
031import java.util.Collections;
032import java.util.HashMap;
033import java.util.Iterator;
034import java.util.List;
035import java.util.Map;
036import java.util.Map.Entry;
037import java.util.NavigableMap;
038import java.util.Set;
039import java.util.TreeSet;
040import java.util.concurrent.ConcurrentHashMap;
041import java.util.concurrent.ConcurrentMap;
042import java.util.concurrent.TimeUnit;
043import java.util.concurrent.atomic.AtomicBoolean;
044import java.util.concurrent.atomic.AtomicLong;
045import java.util.concurrent.atomic.LongAdder;
046import org.apache.commons.lang3.mutable.MutableObject;
047import org.apache.hadoop.conf.Configuration;
048import org.apache.hadoop.fs.Path;
049import org.apache.hadoop.hbase.ByteBufferExtendedCell;
050import org.apache.hadoop.hbase.CacheEvictionStats;
051import org.apache.hadoop.hbase.CacheEvictionStatsBuilder;
052import org.apache.hadoop.hbase.Cell;
053import org.apache.hadoop.hbase.CellScannable;
054import org.apache.hadoop.hbase.CellScanner;
055import org.apache.hadoop.hbase.CellUtil;
056import org.apache.hadoop.hbase.CompareOperator;
057import org.apache.hadoop.hbase.DoNotRetryIOException;
058import org.apache.hadoop.hbase.DroppedSnapshotException;
059import org.apache.hadoop.hbase.HBaseIOException;
060import org.apache.hadoop.hbase.HConstants;
061import org.apache.hadoop.hbase.MultiActionResultTooLarge;
062import org.apache.hadoop.hbase.NotServingRegionException;
063import org.apache.hadoop.hbase.PrivateCellUtil;
064import org.apache.hadoop.hbase.RegionTooBusyException;
065import org.apache.hadoop.hbase.Server;
066import org.apache.hadoop.hbase.ServerName;
067import org.apache.hadoop.hbase.TableName;
068import org.apache.hadoop.hbase.UnknownScannerException;
069import org.apache.hadoop.hbase.client.Append;
070import org.apache.hadoop.hbase.client.ConnectionUtils;
071import org.apache.hadoop.hbase.client.Delete;
072import org.apache.hadoop.hbase.client.Durability;
073import org.apache.hadoop.hbase.client.Get;
074import org.apache.hadoop.hbase.client.Increment;
075import org.apache.hadoop.hbase.client.Mutation;
076import org.apache.hadoop.hbase.client.Put;
077import org.apache.hadoop.hbase.client.RegionInfo;
078import org.apache.hadoop.hbase.client.RegionReplicaUtil;
079import org.apache.hadoop.hbase.client.Result;
080import org.apache.hadoop.hbase.client.Row;
081import org.apache.hadoop.hbase.client.RowMutations;
082import org.apache.hadoop.hbase.client.Scan;
083import org.apache.hadoop.hbase.client.TableDescriptor;
084import org.apache.hadoop.hbase.client.VersionInfoUtil;
085import org.apache.hadoop.hbase.conf.ConfigurationObserver;
086import org.apache.hadoop.hbase.exceptions.FailedSanityCheckException;
087import org.apache.hadoop.hbase.exceptions.OutOfOrderScannerNextException;
088import org.apache.hadoop.hbase.exceptions.ScannerResetException;
089import org.apache.hadoop.hbase.exceptions.UnknownProtocolException;
090import org.apache.hadoop.hbase.filter.ByteArrayComparable;
091import org.apache.hadoop.hbase.io.TimeRange;
092import org.apache.hadoop.hbase.io.hfile.BlockCache;
093import org.apache.hadoop.hbase.ipc.HBaseRPCErrorHandler;
094import org.apache.hadoop.hbase.ipc.HBaseRpcController;
095import org.apache.hadoop.hbase.ipc.PriorityFunction;
096import org.apache.hadoop.hbase.ipc.QosPriority;
097import org.apache.hadoop.hbase.ipc.RpcCallContext;
098import org.apache.hadoop.hbase.ipc.RpcCallback;
099import org.apache.hadoop.hbase.ipc.RpcScheduler;
100import org.apache.hadoop.hbase.ipc.RpcServer;
101import org.apache.hadoop.hbase.ipc.RpcServer.BlockingServiceAndInterface;
102import org.apache.hadoop.hbase.ipc.RpcServerFactory;
103import org.apache.hadoop.hbase.ipc.RpcServerInterface;
104import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
105import org.apache.hadoop.hbase.ipc.ServerRpcController;
106import org.apache.hadoop.hbase.log.HBaseMarkers;
107import org.apache.hadoop.hbase.master.MasterRpcServices;
108import org.apache.hadoop.hbase.net.Address;
109import org.apache.hadoop.hbase.procedure2.RSProcedureCallable;
110import org.apache.hadoop.hbase.quotas.ActivePolicyEnforcement;
111import org.apache.hadoop.hbase.quotas.OperationQuota;
112import org.apache.hadoop.hbase.quotas.QuotaUtil;
113import org.apache.hadoop.hbase.quotas.RegionServerRpcQuotaManager;
114import org.apache.hadoop.hbase.quotas.RegionServerSpaceQuotaManager;
115import org.apache.hadoop.hbase.quotas.SpaceQuotaSnapshot;
116import org.apache.hadoop.hbase.quotas.SpaceViolationPolicyEnforcement;
117import org.apache.hadoop.hbase.regionserver.HRegion.RegionScannerImpl;
118import org.apache.hadoop.hbase.regionserver.Leases.Lease;
119import org.apache.hadoop.hbase.regionserver.Leases.LeaseStillHeldException;
120import org.apache.hadoop.hbase.regionserver.Region.Operation;
121import org.apache.hadoop.hbase.regionserver.ScannerContext.LimitScope;
122import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
123import org.apache.hadoop.hbase.regionserver.handler.AssignRegionHandler;
124import org.apache.hadoop.hbase.regionserver.handler.OpenMetaHandler;
125import org.apache.hadoop.hbase.regionserver.handler.OpenPriorityRegionHandler;
126import org.apache.hadoop.hbase.regionserver.handler.OpenRegionHandler;
127import org.apache.hadoop.hbase.regionserver.handler.UnassignRegionHandler;
128import org.apache.hadoop.hbase.security.Superusers;
129import org.apache.hadoop.hbase.security.User;
130import org.apache.hadoop.hbase.security.access.AccessChecker;
131import org.apache.hadoop.hbase.security.access.Permission;
132import org.apache.hadoop.hbase.util.Bytes;
133import org.apache.hadoop.hbase.util.DNS;
134import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
135import org.apache.hadoop.hbase.util.Pair;
136import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil;
137import org.apache.hadoop.hbase.util.Strings;
138import org.apache.hadoop.hbase.wal.WAL;
139import org.apache.hadoop.hbase.wal.WALEdit;
140import org.apache.hadoop.hbase.wal.WALKey;
141import org.apache.hadoop.hbase.wal.WALSplitUtil;
142import org.apache.hadoop.hbase.wal.WALSplitUtil.MutationReplay;
143import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
144import org.apache.yetus.audience.InterfaceAudience;
145import org.slf4j.Logger;
146import org.slf4j.LoggerFactory;
147
148import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
149import org.apache.hbase.thirdparty.com.google.common.cache.Cache;
150import org.apache.hbase.thirdparty.com.google.common.cache.CacheBuilder;
151import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
152import org.apache.hbase.thirdparty.com.google.protobuf.ByteString;
153import org.apache.hbase.thirdparty.com.google.protobuf.Message;
154import org.apache.hbase.thirdparty.com.google.protobuf.RpcController;
155import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
156import org.apache.hbase.thirdparty.com.google.protobuf.TextFormat;
157import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations;
158import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils;
159
160import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
161import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
162import org.apache.hadoop.hbase.shaded.protobuf.ResponseConverter;
163import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService;
164import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearCompactionQueuesRequest;
165import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearCompactionQueuesResponse;
166import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearRegionBlockCacheRequest;
167import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearRegionBlockCacheResponse;
168import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CloseRegionRequest;
169import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CloseRegionResponse;
170import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactRegionRequest;
171import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactRegionResponse;
172import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactionSwitchRequest;
173import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactionSwitchResponse;
174import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ExecuteProceduresRequest;
175import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ExecuteProceduresResponse;
176import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionRequest;
177import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionResponse;
178import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetOnlineRegionRequest;
179import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetOnlineRegionResponse;
180import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoRequest;
181import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoResponse;
182import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionLoadRequest;
183import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionLoadResponse;
184import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetServerInfoRequest;
185import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetServerInfoResponse;
186import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetStoreFileRequest;
187import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetStoreFileResponse;
188import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionRequest;
189import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionRequest.RegionOpenInfo;
190import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionResponse;
191import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionResponse.RegionOpeningState;
192import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.RemoteProcedureRequest;
193import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ReplicateWALEntryRequest;
194import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ReplicateWALEntryResponse;
195import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.RollWALWriterRequest;
196import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.RollWALWriterResponse;
197import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.StopServerRequest;
198import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.StopServerResponse;
199import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateConfigurationRequest;
200import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateConfigurationResponse;
201import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateFavoredNodesRequest;
202import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateFavoredNodesResponse;
203import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WALEntry;
204import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WarmupRegionRequest;
205import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WarmupRegionResponse;
206import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
207import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.Action;
208import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.BulkLoadHFileRequest;
209import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.BulkLoadHFileRequest.FamilyPath;
210import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.BulkLoadHFileResponse;
211import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CleanupBulkLoadRequest;
212import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CleanupBulkLoadResponse;
213import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService;
214import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.Condition;
215import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceRequest;
216import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceResponse;
217import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.GetRequest;
218import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.GetResponse;
219import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MultiRegionLoadStats;
220import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MultiRequest;
221import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MultiResponse;
222import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateRequest;
223import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateResponse;
224import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutationProto;
225import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutationProto.MutationType;
226import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.PrepareBulkLoadRequest;
227import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.PrepareBulkLoadResponse;
228import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.RegionAction;
229import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.RegionActionResult;
230import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ResultOrException;
231import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanRequest;
232import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanResponse;
233import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos;
234import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos.RegionLoad;
235import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.NameBytesPair;
236import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.NameInt64Pair;
237import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier;
238import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType;
239import org.apache.hadoop.hbase.shaded.protobuf.generated.MapReduceProtos.ScanMetrics;
240import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsRequest;
241import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsResponse;
242import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsResponse.TableQuotaSnapshot;
243import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader;
244import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.BulkLoadDescriptor;
245import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.CompactionDescriptor;
246import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.FlushDescriptor;
247import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.RegionEventDescriptor;
248
249/**
250 * Implements the regionserver RPC services.
251 */
252@InterfaceAudience.Private
253@SuppressWarnings("deprecation")
254public class RSRpcServices implements HBaseRPCErrorHandler,
255    AdminService.BlockingInterface, ClientService.BlockingInterface, PriorityFunction,
256    ConfigurationObserver {
257  protected static final Logger LOG = LoggerFactory.getLogger(RSRpcServices.class);
258
259  /** RPC scheduler to use for the region server. */
260  public static final String REGION_SERVER_RPC_SCHEDULER_FACTORY_CLASS =
261    "hbase.region.server.rpc.scheduler.factory.class";
262
263  /** RPC scheduler to use for the master. */
264  public static final String MASTER_RPC_SCHEDULER_FACTORY_CLASS =
265    "hbase.master.rpc.scheduler.factory.class";
266
267  /**
268   * Minimum allowable time limit delta (in milliseconds) that can be enforced during scans. This
269   * configuration exists to prevent the scenario where a time limit is specified to be so
270   * restrictive that the time limit is reached immediately (before any cells are scanned).
271   */
272  private static final String REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA =
273      "hbase.region.server.rpc.minimum.scan.time.limit.delta";
274  /**
275   * Default value of {@link RSRpcServices#REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA}
276   */
277  private static final long DEFAULT_REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA = 10;
278
279  /**
280   * Number of rows in a batch operation above which a warning will be logged.
281   */
282  static final String BATCH_ROWS_THRESHOLD_NAME = "hbase.rpc.rows.warning.threshold";
283  /**
284   * Default value of {@link RSRpcServices#BATCH_ROWS_THRESHOLD_NAME}
285   */
286  static final int BATCH_ROWS_THRESHOLD_DEFAULT = 5000;
287
288  protected static final String RESERVOIR_ENABLED_KEY = "hbase.ipc.server.reservoir.enabled";
289
290  // Request counter. (Includes requests that are not serviced by regions.)
291  // Count only once for requests with multiple actions like multi/caching-scan/replayBatch
292  final LongAdder requestCount = new LongAdder();
293
294  // Request counter for rpc get
295  final LongAdder rpcGetRequestCount = new LongAdder();
296
297  // Request counter for rpc scan
298  final LongAdder rpcScanRequestCount = new LongAdder();
299
300  // Request counter for rpc multi
301  final LongAdder rpcMultiRequestCount = new LongAdder();
302
303  // Request counter for rpc mutate
304  final LongAdder rpcMutateRequestCount = new LongAdder();
305
306  // Server to handle client requests.
307  final RpcServerInterface rpcServer;
308  final InetSocketAddress isa;
309
310  @VisibleForTesting
311  protected final HRegionServer regionServer;
312  private final long maxScannerResultSize;
313
314  // The reference to the priority extraction function
315  private final PriorityFunction priority;
316
317  private ScannerIdGenerator scannerIdGenerator;
318  private final ConcurrentMap<String, RegionScannerHolder> scanners = new ConcurrentHashMap<>();
319  // Hold the name of a closed scanner for a while. This is used to keep compatible for old clients
320  // which may send next or close request to a region scanner which has already been exhausted. The
321  // entries will be removed automatically after scannerLeaseTimeoutPeriod.
322  private final Cache<String, String> closedScanners;
323  /**
324   * The lease timeout period for client scanners (milliseconds).
325   */
326  private final int scannerLeaseTimeoutPeriod;
327
328  /**
329   * The RPC timeout period (milliseconds)
330   */
331  private final int rpcTimeout;
332
333  /**
334   * The minimum allowable delta to use for the scan limit
335   */
336  private final long minimumScanTimeLimitDelta;
337
338  /**
339   * Row size threshold for multi requests above which a warning is logged
340   */
341  private final int rowSizeWarnThreshold;
342
343  final AtomicBoolean clearCompactionQueues = new AtomicBoolean(false);
344
345  // We want to vet all accesses at the point of entry itself; limiting scope of access checker
346  // instance to only this class to prevent its use from spreading deeper into implementation.
347  // Initialized in start() since AccessChecker needs ZKWatcher which is created by HRegionServer
348  // after RSRpcServices constructor and before start() is called.
349  // Initialized only if authorization is enabled, else remains null.
350  protected AccessChecker accessChecker;
351
352  /**
353   * Services launched in RSRpcServices. By default they are on but you can use the below
354   * booleans to selectively enable/disable either Admin or Client Service (Rare is the case
355   * where you would ever turn off one or the other).
356   */
357  public static final String REGIONSERVER_ADMIN_SERVICE_CONFIG =
358      "hbase.regionserver.admin.executorService";
359  public static final String REGIONSERVER_CLIENT_SERVICE_CONFIG =
360      "hbase.regionserver.client.executorService";
361
362  /**
363   * An Rpc callback for closing a RegionScanner.
364   */
365  private static final class RegionScannerCloseCallBack implements RpcCallback {
366
367    private final RegionScanner scanner;
368
369    public RegionScannerCloseCallBack(RegionScanner scanner) {
370      this.scanner = scanner;
371    }
372
373    @Override
374    public void run() throws IOException {
375      this.scanner.close();
376    }
377  }
378
379  /**
380   * An Rpc callback for doing shipped() call on a RegionScanner.
381   */
382  private class RegionScannerShippedCallBack implements RpcCallback {
383
384    private final String scannerName;
385    private final Shipper shipper;
386    private final Lease lease;
387
388    public RegionScannerShippedCallBack(String scannerName, Shipper shipper, Lease lease) {
389      this.scannerName = scannerName;
390      this.shipper = shipper;
391      this.lease = lease;
392    }
393
394    @Override
395    public void run() throws IOException {
396      this.shipper.shipped();
397      // We're done. On way out re-add the above removed lease. The lease was temp removed for this
398      // Rpc call and we are at end of the call now. Time to add it back.
399      if (scanners.containsKey(scannerName)) {
400        if (lease != null) regionServer.leases.addLease(lease);
401      }
402    }
403  }
404
405  /**
406   * An RpcCallBack that creates a list of scanners that needs to perform callBack operation on
407   * completion of multiGets.
408   */
409   static class RegionScannersCloseCallBack implements RpcCallback {
410    private final List<RegionScanner> scanners = new ArrayList<>();
411
412    public void addScanner(RegionScanner scanner) {
413      this.scanners.add(scanner);
414    }
415
416    @Override
417    public void run() {
418      for (RegionScanner scanner : scanners) {
419        try {
420          scanner.close();
421        } catch (IOException e) {
422          LOG.error("Exception while closing the scanner " + scanner, e);
423        }
424      }
425    }
426  }
427
428  /**
429   * Holder class which holds the RegionScanner, nextCallSeq and RpcCallbacks together.
430   */
431  private static final class RegionScannerHolder {
432
433    private final AtomicLong nextCallSeq = new AtomicLong(0);
434    private final String scannerName;
435    private final RegionScanner s;
436    private final HRegion r;
437    private final RpcCallback closeCallBack;
438    private final RpcCallback shippedCallback;
439    private byte[] rowOfLastPartialResult;
440    private boolean needCursor;
441
442    public RegionScannerHolder(String scannerName, RegionScanner s, HRegion r,
443        RpcCallback closeCallBack, RpcCallback shippedCallback, boolean needCursor) {
444      this.scannerName = scannerName;
445      this.s = s;
446      this.r = r;
447      this.closeCallBack = closeCallBack;
448      this.shippedCallback = shippedCallback;
449      this.needCursor = needCursor;
450    }
451
452    public long getNextCallSeq() {
453      return nextCallSeq.get();
454    }
455
456    public boolean incNextCallSeq(long currentSeq) {
457      // Use CAS to prevent multiple scan request running on the same scanner.
458      return nextCallSeq.compareAndSet(currentSeq, currentSeq + 1);
459    }
460  }
461
462  /**
463   * Instantiated as a scanner lease. If the lease times out, the scanner is
464   * closed
465   */
466  private class ScannerListener implements LeaseListener {
467    private final String scannerName;
468
469    ScannerListener(final String n) {
470      this.scannerName = n;
471    }
472
473    @Override
474    public void leaseExpired() {
475      RegionScannerHolder rsh = scanners.remove(this.scannerName);
476      if (rsh != null) {
477        RegionScanner s = rsh.s;
478        LOG.info("Scanner " + this.scannerName + " lease expired on region "
479          + s.getRegionInfo().getRegionNameAsString());
480        HRegion region = null;
481        try {
482          region = regionServer.getRegion(s.getRegionInfo().getRegionName());
483          if (region != null && region.getCoprocessorHost() != null) {
484            region.getCoprocessorHost().preScannerClose(s);
485          }
486        } catch (IOException e) {
487          LOG.error("Closing scanner for " + s.getRegionInfo().getRegionNameAsString(), e);
488        } finally {
489          try {
490            s.close();
491            if (region != null && region.getCoprocessorHost() != null) {
492              region.getCoprocessorHost().postScannerClose(s);
493            }
494          } catch (IOException e) {
495            LOG.error("Closing scanner for " + s.getRegionInfo().getRegionNameAsString(), e);
496          }
497        }
498      } else {
499        LOG.warn("Scanner " + this.scannerName + " lease expired, but no related" +
500          " scanner found, hence no chance to close that related scanner!");
501      }
502    }
503  }
504
505  private static ResultOrException getResultOrException(final ClientProtos.Result r,
506                                                        final int index){
507    return getResultOrException(ResponseConverter.buildActionResult(r), index);
508  }
509
510  private static ResultOrException getResultOrException(final Exception e, final int index) {
511    return getResultOrException(ResponseConverter.buildActionResult(e), index);
512  }
513
514  private static ResultOrException getResultOrException(
515      final ResultOrException.Builder builder, final int index) {
516    return builder.setIndex(index).build();
517  }
518
519  /**
520   * Checks for the following pre-checks in order:
521   * <ol>
522   *   <li>RegionServer is running</li>
523   *   <li>If authorization is enabled, then RPC caller has ADMIN permissions</li>
524   * </ol>
525   * @param requestName name of rpc request. Used in reporting failures to provide context.
526   * @throws ServiceException If any of the above listed pre-check fails.
527   */
528  private void rpcPreCheck(String requestName) throws ServiceException {
529    try {
530      checkOpen();
531      requirePermission(requestName, Permission.Action.ADMIN);
532    } catch (IOException ioe) {
533      throw new ServiceException(ioe);
534    }
535  }
536
537  /**
538   * Starts the nonce operation for a mutation, if needed.
539   * @param mutation Mutation.
540   * @param nonceGroup Nonce group from the request.
541   * @return whether to proceed this mutation.
542   */
543  private boolean startNonceOperation(final MutationProto mutation, long nonceGroup)
544      throws IOException {
545    if (regionServer.nonceManager == null || !mutation.hasNonce()) return true;
546    boolean canProceed = false;
547    try {
548      canProceed = regionServer.nonceManager.startOperation(
549        nonceGroup, mutation.getNonce(), regionServer);
550    } catch (InterruptedException ex) {
551      throw new InterruptedIOException("Nonce start operation interrupted");
552    }
553    return canProceed;
554  }
555
556  /**
557   * Ends nonce operation for a mutation, if needed.
558   * @param mutation Mutation.
559   * @param nonceGroup Nonce group from the request. Always 0 in initial implementation.
560   * @param success Whether the operation for this nonce has succeeded.
561   */
562  private void endNonceOperation(final MutationProto mutation,
563      long nonceGroup, boolean success) {
564    if (regionServer.nonceManager != null && mutation.hasNonce()) {
565      regionServer.nonceManager.endOperation(nonceGroup, mutation.getNonce(), success);
566    }
567  }
568
569  private boolean isClientCellBlockSupport(RpcCallContext context) {
570    return context != null && context.isClientCellBlockSupported();
571  }
572
573  private void addResult(final MutateResponse.Builder builder, final Result result,
574      final HBaseRpcController rpcc, boolean clientCellBlockSupported) {
575    if (result == null) return;
576    if (clientCellBlockSupported) {
577      builder.setResult(ProtobufUtil.toResultNoData(result));
578      rpcc.setCellScanner(result.cellScanner());
579    } else {
580      ClientProtos.Result pbr = ProtobufUtil.toResult(result);
581      builder.setResult(pbr);
582    }
583  }
584
585  private void addResults(ScanResponse.Builder builder, List<Result> results,
586      HBaseRpcController controller, boolean isDefaultRegion, boolean clientCellBlockSupported) {
587    builder.setStale(!isDefaultRegion);
588    if (results.isEmpty()) {
589      return;
590    }
591    if (clientCellBlockSupported) {
592      for (Result res : results) {
593        builder.addCellsPerResult(res.size());
594        builder.addPartialFlagPerResult(res.mayHaveMoreCellsInRow());
595      }
596      controller.setCellScanner(CellUtil.createCellScanner(results));
597    } else {
598      for (Result res : results) {
599        ClientProtos.Result pbr = ProtobufUtil.toResult(res);
600        builder.addResults(pbr);
601      }
602    }
603  }
604
605  /**
606   * Mutate a list of rows atomically.
607   * @param cellScanner if non-null, the mutation data -- the Cell content.
608   */
609  private boolean checkAndRowMutate(final HRegion region, final List<ClientProtos.Action> actions,
610    final CellScanner cellScanner, byte[] row, byte[] family, byte[] qualifier, CompareOperator op,
611    ByteArrayComparable comparator, TimeRange timeRange, RegionActionResult.Builder builder,
612    ActivePolicyEnforcement spaceQuotaEnforcement) throws IOException {
613    int countOfCompleteMutation = 0;
614    try {
615      if (!region.getRegionInfo().isMetaRegion()) {
616        regionServer.cacheFlusher.reclaimMemStoreMemory();
617      }
618      RowMutations rm = null;
619      int i = 0;
620      ClientProtos.ResultOrException.Builder resultOrExceptionOrBuilder =
621        ClientProtos.ResultOrException.newBuilder();
622      for (ClientProtos.Action action: actions) {
623        if (action.hasGet()) {
624          throw new DoNotRetryIOException("Atomic put and/or delete only, not a Get=" +
625            action.getGet());
626        }
627        MutationType type = action.getMutation().getMutateType();
628        if (rm == null) {
629          rm = new RowMutations(action.getMutation().getRow().toByteArray(), actions.size());
630        }
631        switch (type) {
632          case PUT:
633            Put put = ProtobufUtil.toPut(action.getMutation(), cellScanner);
634            ++countOfCompleteMutation;
635            checkCellSizeLimit(region, put);
636            spaceQuotaEnforcement.getPolicyEnforcement(region).check(put);
637            rm.add(put);
638            break;
639          case DELETE:
640            Delete del = ProtobufUtil.toDelete(action.getMutation(), cellScanner);
641            ++countOfCompleteMutation;
642            spaceQuotaEnforcement.getPolicyEnforcement(region).check(del);
643            rm.add(del);
644            break;
645          default:
646            throw new DoNotRetryIOException("Atomic put and/or delete only, not " + type.name());
647        }
648        // To unify the response format with doNonAtomicRegionMutation and read through client's
649        // AsyncProcess we have to add an empty result instance per operation
650        resultOrExceptionOrBuilder.clear();
651        resultOrExceptionOrBuilder.setIndex(i++);
652        builder.addResultOrException(
653          resultOrExceptionOrBuilder.build());
654      }
655      return region.checkAndRowMutate(row, family, qualifier, op, comparator, timeRange, rm);
656    } finally {
657      // Currently, the checkAndMutate isn't supported by batch so it won't mess up the cell scanner
658      // even if the malformed cells are not skipped.
659      for (int i = countOfCompleteMutation; i < actions.size(); ++i) {
660        skipCellsForMutation(actions.get(i), cellScanner);
661      }
662    }
663  }
664
665  /**
666   * Execute an append mutation.
667   *
668   * @return result to return to client if default operation should be
669   * bypassed as indicated by RegionObserver, null otherwise
670   */
671  private Result append(final HRegion region, final OperationQuota quota,
672      final MutationProto mutation, final CellScanner cellScanner, long nonceGroup,
673      ActivePolicyEnforcement spaceQuota)
674      throws IOException {
675    long before = EnvironmentEdgeManager.currentTime();
676    Append append = ProtobufUtil.toAppend(mutation, cellScanner);
677    checkCellSizeLimit(region, append);
678    spaceQuota.getPolicyEnforcement(region).check(append);
679    quota.addMutation(append);
680    Result r = null;
681    if (region.getCoprocessorHost() != null) {
682      r = region.getCoprocessorHost().preAppend(append);
683    }
684    if (r == null) {
685      boolean canProceed = startNonceOperation(mutation, nonceGroup);
686      boolean success = false;
687      try {
688        long nonce = mutation.hasNonce() ? mutation.getNonce() : HConstants.NO_NONCE;
689        if (canProceed) {
690          r = region.append(append, nonceGroup, nonce);
691        } else {
692          // convert duplicate append to get
693          List<Cell> results = region.get(ProtobufUtil.toGet(mutation, cellScanner), false,
694              nonceGroup, nonce);
695          r = Result.create(results);
696        }
697        success = true;
698      } finally {
699        if (canProceed) {
700          endNonceOperation(mutation, nonceGroup, success);
701        }
702      }
703      if (region.getCoprocessorHost() != null) {
704        r = region.getCoprocessorHost().postAppend(append, r);
705      }
706    }
707    if (regionServer.metricsRegionServer != null) {
708      regionServer.metricsRegionServer.updateAppend(
709          region.getTableDescriptor().getTableName(),
710        EnvironmentEdgeManager.currentTime() - before);
711    }
712    return r == null ? Result.EMPTY_RESULT : r;
713  }
714
715  /**
716   * Execute an increment mutation.
717   *
718   * @param region
719   * @param mutation
720   * @return the Result
721   * @throws IOException
722   */
723  private Result increment(final HRegion region, final OperationQuota quota,
724      final MutationProto mutation, final CellScanner cells, long nonceGroup,
725      ActivePolicyEnforcement spaceQuota)
726      throws IOException {
727    long before = EnvironmentEdgeManager.currentTime();
728    Increment increment = ProtobufUtil.toIncrement(mutation, cells);
729    checkCellSizeLimit(region, increment);
730    spaceQuota.getPolicyEnforcement(region).check(increment);
731    quota.addMutation(increment);
732    Result r = null;
733    if (region.getCoprocessorHost() != null) {
734      r = region.getCoprocessorHost().preIncrement(increment);
735    }
736    if (r == null) {
737      boolean canProceed = startNonceOperation(mutation, nonceGroup);
738      boolean success = false;
739      try {
740        long nonce = mutation.hasNonce() ? mutation.getNonce() : HConstants.NO_NONCE;
741        if (canProceed) {
742          r = region.increment(increment, nonceGroup, nonce);
743        } else {
744          // convert duplicate increment to get
745          List<Cell> results = region.get(ProtobufUtil.toGet(mutation, cells), false, nonceGroup,
746              nonce);
747          r = Result.create(results);
748        }
749        success = true;
750      } finally {
751        if (canProceed) {
752          endNonceOperation(mutation, nonceGroup, success);
753        }
754      }
755      if (region.getCoprocessorHost() != null) {
756        r = region.getCoprocessorHost().postIncrement(increment, r);
757      }
758    }
759    if (regionServer.metricsRegionServer != null) {
760      regionServer.metricsRegionServer.updateIncrement(
761          region.getTableDescriptor().getTableName(),
762          EnvironmentEdgeManager.currentTime() - before);
763    }
764    return r == null ? Result.EMPTY_RESULT : r;
765  }
766
767  /**
768   * Run through the regionMutation <code>rm</code> and per Mutation, do the work, and then when
769   * done, add an instance of a {@link ResultOrException} that corresponds to each Mutation.
770   * @param cellsToReturn  Could be null. May be allocated in this method.  This is what this
771   * method returns as a 'result'.
772   * @param closeCallBack the callback to be used with multigets
773   * @param context the current RpcCallContext
774   * @return Return the <code>cellScanner</code> passed
775   */
776  private List<CellScannable> doNonAtomicRegionMutation(final HRegion region,
777      final OperationQuota quota, final RegionAction actions, final CellScanner cellScanner,
778      final RegionActionResult.Builder builder, List<CellScannable> cellsToReturn, long nonceGroup,
779      final RegionScannersCloseCallBack closeCallBack, RpcCallContext context,
780      ActivePolicyEnforcement spaceQuotaEnforcement) {
781    // Gather up CONTIGUOUS Puts and Deletes in this mutations List.  Idea is that rather than do
782    // one at a time, we instead pass them in batch.  Be aware that the corresponding
783    // ResultOrException instance that matches each Put or Delete is then added down in the
784    // doNonAtomicBatchOp call.  We should be staying aligned though the Put and Delete are
785    // deferred/batched
786    List<ClientProtos.Action> mutations = null;
787    long maxQuotaResultSize = Math.min(maxScannerResultSize, quota.getReadAvailable());
788    IOException sizeIOE = null;
789    Object lastBlock = null;
790    ClientProtos.ResultOrException.Builder resultOrExceptionBuilder = ResultOrException.newBuilder();
791    boolean hasResultOrException = false;
792    for (ClientProtos.Action action : actions.getActionList()) {
793      hasResultOrException = false;
794      resultOrExceptionBuilder.clear();
795      try {
796        Result r = null;
797
798        if (context != null
799            && context.isRetryImmediatelySupported()
800            && (context.getResponseCellSize() > maxQuotaResultSize
801              || context.getResponseBlockSize() + context.getResponseExceptionSize()
802              > maxQuotaResultSize)) {
803
804          // We're storing the exception since the exception and reason string won't
805          // change after the response size limit is reached.
806          if (sizeIOE == null ) {
807            // We don't need the stack un-winding do don't throw the exception.
808            // Throwing will kill the JVM's JIT.
809            //
810            // Instead just create the exception and then store it.
811            sizeIOE = new MultiActionResultTooLarge("Max size exceeded"
812                + " CellSize: " + context.getResponseCellSize()
813                + " BlockSize: " + context.getResponseBlockSize());
814
815            // Only report the exception once since there's only one request that
816            // caused the exception. Otherwise this number will dominate the exceptions count.
817            rpcServer.getMetrics().exception(sizeIOE);
818          }
819
820          // Now that there's an exception is known to be created
821          // use it for the response.
822          //
823          // This will create a copy in the builder.
824          NameBytesPair pair = ResponseConverter.buildException(sizeIOE);
825          resultOrExceptionBuilder.setException(pair);
826          context.incrementResponseExceptionSize(pair.getSerializedSize());
827          resultOrExceptionBuilder.setIndex(action.getIndex());
828          builder.addResultOrException(resultOrExceptionBuilder.build());
829          skipCellsForMutation(action, cellScanner);
830          continue;
831        }
832        if (action.hasGet()) {
833          long before = EnvironmentEdgeManager.currentTime();
834          ClientProtos.Get pbGet = action.getGet();
835          // An asynchbase client, https://github.com/OpenTSDB/asynchbase, starts by trying to do
836          // a get closest before. Throwing the UnknownProtocolException signals it that it needs
837          // to switch and do hbase2 protocol (HBase servers do not tell clients what versions
838          // they are; its a problem for non-native clients like asynchbase. HBASE-20225.
839          if (pbGet.hasClosestRowBefore() && pbGet.getClosestRowBefore()) {
840            throw new UnknownProtocolException("Is this a pre-hbase-1.0.0 or asynchbase client? " +
841                "Client is invoking getClosestRowBefore removed in hbase-2.0.0 replaced by " +
842                "reverse Scan.");
843          }
844          try {
845            Get get = ProtobufUtil.toGet(pbGet);
846            if (context != null) {
847              r = get(get, (region), closeCallBack, context);
848            } else {
849              r = region.get(get);
850            }
851          } finally {
852            if (regionServer.metricsRegionServer != null) {
853              regionServer.metricsRegionServer.updateGet(
854                  region.getTableDescriptor().getTableName(),
855                  EnvironmentEdgeManager.currentTime() - before);
856            }
857          }
858        } else if (action.hasServiceCall()) {
859          hasResultOrException = true;
860          com.google.protobuf.Message result =
861            execServiceOnRegion(region, action.getServiceCall());
862          ClientProtos.CoprocessorServiceResult.Builder serviceResultBuilder =
863            ClientProtos.CoprocessorServiceResult.newBuilder();
864          resultOrExceptionBuilder.setServiceResult(
865            serviceResultBuilder.setValue(
866              serviceResultBuilder.getValueBuilder()
867                .setName(result.getClass().getName())
868                // TODO: Copy!!!
869                .setValue(UnsafeByteOperations.unsafeWrap(result.toByteArray()))));
870        } else if (action.hasMutation()) {
871          MutationType type = action.getMutation().getMutateType();
872          if (type != MutationType.PUT && type != MutationType.DELETE && mutations != null &&
873              !mutations.isEmpty()) {
874            // Flush out any Puts or Deletes already collected.
875            doNonAtomicBatchOp(builder, region, quota, mutations, cellScanner,
876              spaceQuotaEnforcement);
877            mutations.clear();
878          }
879          switch (type) {
880            case APPEND:
881              r = append(region, quota, action.getMutation(), cellScanner, nonceGroup,
882                  spaceQuotaEnforcement);
883              break;
884            case INCREMENT:
885              r = increment(region, quota, action.getMutation(), cellScanner, nonceGroup,
886                  spaceQuotaEnforcement);
887              break;
888            case PUT:
889            case DELETE:
890              // Collect the individual mutations and apply in a batch
891              if (mutations == null) {
892                mutations = new ArrayList<>(actions.getActionCount());
893              }
894              mutations.add(action);
895              break;
896            default:
897              throw new DoNotRetryIOException("Unsupported mutate type: " + type.name());
898          }
899        } else {
900          throw new HBaseIOException("Unexpected Action type");
901        }
902        if (r != null) {
903          ClientProtos.Result pbResult = null;
904          if (isClientCellBlockSupport(context)) {
905            pbResult = ProtobufUtil.toResultNoData(r);
906            //  Hard to guess the size here.  Just make a rough guess.
907            if (cellsToReturn == null) {
908              cellsToReturn = new ArrayList<>();
909            }
910            cellsToReturn.add(r);
911          } else {
912            pbResult = ProtobufUtil.toResult(r);
913          }
914          lastBlock = addSize(context, r, lastBlock);
915          hasResultOrException = true;
916          resultOrExceptionBuilder.setResult(pbResult);
917        }
918        // Could get to here and there was no result and no exception.  Presumes we added
919        // a Put or Delete to the collecting Mutations List for adding later.  In this
920        // case the corresponding ResultOrException instance for the Put or Delete will be added
921        // down in the doNonAtomicBatchOp method call rather than up here.
922      } catch (IOException ie) {
923        rpcServer.getMetrics().exception(ie);
924        hasResultOrException = true;
925        NameBytesPair pair = ResponseConverter.buildException(ie);
926        resultOrExceptionBuilder.setException(pair);
927        context.incrementResponseExceptionSize(pair.getSerializedSize());
928      }
929      if (hasResultOrException) {
930        // Propagate index.
931        resultOrExceptionBuilder.setIndex(action.getIndex());
932        builder.addResultOrException(resultOrExceptionBuilder.build());
933      }
934    }
935    // Finish up any outstanding mutations
936    if (!CollectionUtils.isEmpty(mutations)) {
937      doNonAtomicBatchOp(builder, region, quota, mutations, cellScanner, spaceQuotaEnforcement);
938    }
939    return cellsToReturn;
940  }
941
942  private void checkCellSizeLimit(final HRegion r, final Mutation m) throws IOException {
943    if (r.maxCellSize > 0) {
944      CellScanner cells = m.cellScanner();
945      while (cells.advance()) {
946        int size = PrivateCellUtil.estimatedSerializedSizeOf(cells.current());
947        if (size > r.maxCellSize) {
948          String msg = "Cell with size " + size + " exceeds limit of " + r.maxCellSize + " bytes";
949          if (LOG.isDebugEnabled()) {
950            LOG.debug(msg);
951          }
952          throw new DoNotRetryIOException(msg);
953        }
954      }
955    }
956  }
957
958  private void doAtomicBatchOp(final RegionActionResult.Builder builder, final HRegion region,
959    final OperationQuota quota, final List<ClientProtos.Action> mutations,
960    final CellScanner cells, ActivePolicyEnforcement spaceQuotaEnforcement)
961    throws IOException {
962    // Just throw the exception. The exception will be caught and then added to region-level
963    // exception for RegionAction. Leaving the null to action result is ok since the null
964    // result is viewed as failure by hbase client. And the region-lever exception will be used
965    // to replaced the null result. see AsyncRequestFutureImpl#receiveMultiAction and
966    // AsyncBatchRpcRetryingCaller#onComplete for more details.
967    doBatchOp(builder, region, quota, mutations, cells, spaceQuotaEnforcement, true);
968  }
969
970  private void doNonAtomicBatchOp(final RegionActionResult.Builder builder, final HRegion region,
971    final OperationQuota quota, final List<ClientProtos.Action> mutations,
972    final CellScanner cells, ActivePolicyEnforcement spaceQuotaEnforcement) {
973    try {
974      doBatchOp(builder, region, quota, mutations, cells, spaceQuotaEnforcement, false);
975    } catch (IOException e) {
976      // Set the exception for each action. The mutations in same RegionAction are group to
977      // different batch and then be processed individually. Hence, we don't set the region-level
978      // exception here for whole RegionAction.
979      for (Action mutation : mutations) {
980        builder.addResultOrException(getResultOrException(e, mutation.getIndex()));
981      }
982    }
983  }
984
985  /**
986   * Execute a list of Put/Delete mutations.
987   *
988   * @param builder
989   * @param region
990   * @param mutations
991   */
992  private void doBatchOp(final RegionActionResult.Builder builder, final HRegion region,
993      final OperationQuota quota, final List<ClientProtos.Action> mutations,
994      final CellScanner cells, ActivePolicyEnforcement spaceQuotaEnforcement, boolean atomic)
995      throws IOException {
996    Mutation[] mArray = new Mutation[mutations.size()];
997    long before = EnvironmentEdgeManager.currentTime();
998    boolean batchContainsPuts = false, batchContainsDelete = false;
999    try {
1000      /** HBASE-17924
1001       * mutationActionMap is a map to map the relation between mutations and actions
1002       * since mutation array may have been reoredered.In order to return the right
1003       * result or exception to the corresponding actions, We need to know which action
1004       * is the mutation belong to. We can't sort ClientProtos.Action array, since they
1005       * are bonded to cellscanners.
1006       */
1007      Map<Mutation, ClientProtos.Action> mutationActionMap = new HashMap<>();
1008      int i = 0;
1009      for (ClientProtos.Action action: mutations) {
1010        if (action.hasGet()) {
1011          throw new DoNotRetryIOException("Atomic put and/or delete only, not a Get=" +
1012            action.getGet());
1013        }
1014        MutationProto m = action.getMutation();
1015        Mutation mutation;
1016        if (m.getMutateType() == MutationType.PUT) {
1017          mutation = ProtobufUtil.toPut(m, cells);
1018          batchContainsPuts = true;
1019        } else {
1020          mutation = ProtobufUtil.toDelete(m, cells);
1021          batchContainsDelete = true;
1022        }
1023        mutationActionMap.put(mutation, action);
1024        mArray[i++] = mutation;
1025        checkCellSizeLimit(region, mutation);
1026        // Check if a space quota disallows this mutation
1027        spaceQuotaEnforcement.getPolicyEnforcement(region).check(mutation);
1028        quota.addMutation(mutation);
1029      }
1030
1031      if (!region.getRegionInfo().isMetaRegion()) {
1032        regionServer.cacheFlusher.reclaimMemStoreMemory();
1033      }
1034
1035      // HBASE-17924
1036      // Sort to improve lock efficiency for non-atomic batch of operations. If atomic
1037      // order is preserved as its expected from the client
1038      if (!atomic) {
1039        Arrays.sort(mArray, (v1, v2) -> Row.COMPARATOR.compare(v1, v2));
1040      }
1041
1042      OperationStatus[] codes = region.batchMutate(mArray, atomic, HConstants.NO_NONCE,
1043        HConstants.NO_NONCE);
1044      for (i = 0; i < codes.length; i++) {
1045        Mutation currentMutation = mArray[i];
1046        ClientProtos.Action currentAction = mutationActionMap.get(currentMutation);
1047        int index = currentAction.hasIndex() || !atomic ? currentAction.getIndex() : i;
1048        Exception e = null;
1049        switch (codes[i].getOperationStatusCode()) {
1050          case BAD_FAMILY:
1051            e = new NoSuchColumnFamilyException(codes[i].getExceptionMsg());
1052            builder.addResultOrException(getResultOrException(e, index));
1053            break;
1054
1055          case SANITY_CHECK_FAILURE:
1056            e = new FailedSanityCheckException(codes[i].getExceptionMsg());
1057            builder.addResultOrException(getResultOrException(e, index));
1058            break;
1059
1060          default:
1061            e = new DoNotRetryIOException(codes[i].getExceptionMsg());
1062            builder.addResultOrException(getResultOrException(e, index));
1063            break;
1064
1065          case SUCCESS:
1066            builder.addResultOrException(getResultOrException(
1067              ClientProtos.Result.getDefaultInstance(), index));
1068            break;
1069
1070          case STORE_TOO_BUSY:
1071            e = new RegionTooBusyException(codes[i].getExceptionMsg());
1072            builder.addResultOrException(getResultOrException(e, index));
1073            break;
1074        }
1075      }
1076    } finally {
1077      int processedMutationIndex = 0;
1078      for (Action mutation : mutations) {
1079        // The non-null mArray[i] means the cell scanner has been read.
1080        if (mArray[processedMutationIndex++] == null) {
1081          skipCellsForMutation(mutation, cells);
1082        }
1083      }
1084      updateMutationMetrics(region, before, batchContainsPuts, batchContainsDelete);
1085    }
1086  }
1087
1088  private void updateMutationMetrics(HRegion region, long starttime, boolean batchContainsPuts,
1089    boolean batchContainsDelete) {
1090    if (regionServer.metricsRegionServer != null) {
1091      long after = EnvironmentEdgeManager.currentTime();
1092      if (batchContainsPuts) {
1093        regionServer.metricsRegionServer
1094          .updatePutBatch(region.getTableDescriptor().getTableName(), after - starttime);
1095      }
1096      if (batchContainsDelete) {
1097        regionServer.metricsRegionServer
1098          .updateDeleteBatch(region.getTableDescriptor().getTableName(), after - starttime);
1099      }
1100    }
1101  }
1102
1103  /**
1104   * Execute a list of Put/Delete mutations. The function returns OperationStatus instead of
1105   * constructing MultiResponse to save a possible loop if caller doesn't need MultiResponse.
1106   * @param region
1107   * @param mutations
1108   * @param replaySeqId
1109   * @return an array of OperationStatus which internally contains the OperationStatusCode and the
1110   *         exceptionMessage if any
1111   * @throws IOException
1112   */
1113  private OperationStatus [] doReplayBatchOp(final HRegion region,
1114      final List<MutationReplay> mutations, long replaySeqId) throws IOException {
1115    long before = EnvironmentEdgeManager.currentTime();
1116    boolean batchContainsPuts = false, batchContainsDelete = false;
1117    try {
1118      for (Iterator<MutationReplay> it = mutations.iterator(); it.hasNext();) {
1119        MutationReplay m = it.next();
1120
1121        if (m.getType() == MutationType.PUT) {
1122          batchContainsPuts = true;
1123        } else {
1124          batchContainsDelete = true;
1125        }
1126
1127        NavigableMap<byte[], List<Cell>> map = m.mutation.getFamilyCellMap();
1128        List<Cell> metaCells = map.get(WALEdit.METAFAMILY);
1129        if (metaCells != null && !metaCells.isEmpty()) {
1130          for (Cell metaCell : metaCells) {
1131            CompactionDescriptor compactionDesc = WALEdit.getCompaction(metaCell);
1132            boolean isDefaultReplica = RegionReplicaUtil.isDefaultReplica(region.getRegionInfo());
1133            HRegion hRegion = region;
1134            if (compactionDesc != null) {
1135              // replay the compaction. Remove the files from stores only if we are the primary
1136              // region replica (thus own the files)
1137              hRegion.replayWALCompactionMarker(compactionDesc, !isDefaultReplica, isDefaultReplica,
1138                replaySeqId);
1139              continue;
1140            }
1141            FlushDescriptor flushDesc = WALEdit.getFlushDescriptor(metaCell);
1142            if (flushDesc != null && !isDefaultReplica) {
1143              hRegion.replayWALFlushMarker(flushDesc, replaySeqId);
1144              continue;
1145            }
1146            RegionEventDescriptor regionEvent = WALEdit.getRegionEventDescriptor(metaCell);
1147            if (regionEvent != null && !isDefaultReplica) {
1148              hRegion.replayWALRegionEventMarker(regionEvent);
1149              continue;
1150            }
1151            BulkLoadDescriptor bulkLoadEvent = WALEdit.getBulkLoadDescriptor(metaCell);
1152            if (bulkLoadEvent != null) {
1153              hRegion.replayWALBulkLoadEventMarker(bulkLoadEvent);
1154              continue;
1155            }
1156          }
1157          it.remove();
1158        }
1159      }
1160      requestCount.increment();
1161      if (!region.getRegionInfo().isMetaRegion()) {
1162        regionServer.cacheFlusher.reclaimMemStoreMemory();
1163      }
1164      return region.batchReplay(mutations.toArray(
1165        new MutationReplay[mutations.size()]), replaySeqId);
1166    } finally {
1167      updateMutationMetrics(region, before, batchContainsPuts, batchContainsDelete);
1168    }
1169  }
1170
1171  private void closeAllScanners() {
1172    // Close any outstanding scanners. Means they'll get an UnknownScanner
1173    // exception next time they come in.
1174    for (Map.Entry<String, RegionScannerHolder> e : scanners.entrySet()) {
1175      try {
1176        e.getValue().s.close();
1177      } catch (IOException ioe) {
1178        LOG.warn("Closing scanner " + e.getKey(), ioe);
1179      }
1180    }
1181  }
1182
1183  // Exposed for testing
1184  interface LogDelegate {
1185    void logBatchWarning(String firstRegionName, int sum, int rowSizeWarnThreshold);
1186  }
1187
1188  private static LogDelegate DEFAULT_LOG_DELEGATE = new LogDelegate() {
1189    @Override
1190    public void logBatchWarning(String firstRegionName, int sum, int rowSizeWarnThreshold) {
1191      if (LOG.isWarnEnabled()) {
1192        LOG.warn("Large batch operation detected (greater than " + rowSizeWarnThreshold
1193            + ") (HBASE-18023)." + " Requested Number of Rows: " + sum + " Client: "
1194            + RpcServer.getRequestUserName().orElse(null) + "/"
1195            + RpcServer.getRemoteAddress().orElse(null)
1196            + " first region in multi=" + firstRegionName);
1197      }
1198    }
1199  };
1200
1201  private final LogDelegate ld;
1202
1203  public RSRpcServices(HRegionServer rs) throws IOException {
1204    this(rs, DEFAULT_LOG_DELEGATE);
1205  }
1206
1207  // Directly invoked only for testing
1208  RSRpcServices(HRegionServer rs, LogDelegate ld) throws IOException {
1209    this.ld = ld;
1210    regionServer = rs;
1211    rowSizeWarnThreshold = rs.conf.getInt(BATCH_ROWS_THRESHOLD_NAME, BATCH_ROWS_THRESHOLD_DEFAULT);
1212    RpcSchedulerFactory rpcSchedulerFactory;
1213    try {
1214      rpcSchedulerFactory = getRpcSchedulerFactoryClass().asSubclass(RpcSchedulerFactory.class)
1215          .getDeclaredConstructor().newInstance();
1216    } catch (NoSuchMethodException | InvocationTargetException |
1217        InstantiationException | IllegalAccessException e) {
1218      throw new IllegalArgumentException(e);
1219    }
1220    // Server to handle client requests.
1221    InetSocketAddress initialIsa;
1222    InetSocketAddress bindAddress;
1223    if(this instanceof MasterRpcServices) {
1224      String hostname = getHostname(rs.conf, true);
1225      int port = rs.conf.getInt(HConstants.MASTER_PORT, HConstants.DEFAULT_MASTER_PORT);
1226      // Creation of a HSA will force a resolve.
1227      initialIsa = new InetSocketAddress(hostname, port);
1228      bindAddress = new InetSocketAddress(rs.conf.get("hbase.master.ipc.address", hostname), port);
1229    } else {
1230      String hostname = getHostname(rs.conf, false);
1231      int port = rs.conf.getInt(HConstants.REGIONSERVER_PORT,
1232        HConstants.DEFAULT_REGIONSERVER_PORT);
1233      // Creation of a HSA will force a resolve.
1234      initialIsa = new InetSocketAddress(hostname, port);
1235      bindAddress = new InetSocketAddress(
1236        rs.conf.get("hbase.regionserver.ipc.address", hostname), port);
1237    }
1238    if (initialIsa.getAddress() == null) {
1239      throw new IllegalArgumentException("Failed resolve of " + initialIsa);
1240    }
1241    priority = createPriority();
1242    // Using Address means we don't get the IP too. Shorten it more even to just the host name
1243    // w/o the domain.
1244    String name = rs.getProcessName() + "/" +
1245        Address.fromParts(initialIsa.getHostName(), initialIsa.getPort()).toStringWithoutDomain();
1246    // Set how many times to retry talking to another server over Connection.
1247    ConnectionUtils.setServerSideHConnectionRetriesConfig(rs.conf, name, LOG);
1248    rpcServer = createRpcServer(rs, rs.conf, rpcSchedulerFactory, bindAddress, name);
1249    rpcServer.setRsRpcServices(this);
1250    scannerLeaseTimeoutPeriod = rs.conf.getInt(
1251      HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD,
1252      HConstants.DEFAULT_HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD);
1253    maxScannerResultSize = rs.conf.getLong(
1254      HConstants.HBASE_SERVER_SCANNER_MAX_RESULT_SIZE_KEY,
1255      HConstants.DEFAULT_HBASE_SERVER_SCANNER_MAX_RESULT_SIZE);
1256    rpcTimeout = rs.conf.getInt(
1257      HConstants.HBASE_RPC_TIMEOUT_KEY,
1258      HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
1259    minimumScanTimeLimitDelta = rs.conf.getLong(
1260      REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA,
1261      DEFAULT_REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA);
1262
1263    InetSocketAddress address = rpcServer.getListenerAddress();
1264    if (address == null) {
1265      throw new IOException("Listener channel is closed");
1266    }
1267    // Set our address, however we need the final port that was given to rpcServer
1268    isa = new InetSocketAddress(initialIsa.getHostName(), address.getPort());
1269    rpcServer.setErrorHandler(this);
1270    rs.setName(name);
1271
1272    closedScanners = CacheBuilder.newBuilder()
1273        .expireAfterAccess(scannerLeaseTimeoutPeriod, TimeUnit.MILLISECONDS).build();
1274  }
1275
1276  protected RpcServerInterface createRpcServer(Server server, Configuration conf,
1277      RpcSchedulerFactory rpcSchedulerFactory, InetSocketAddress bindAddress, String name)
1278      throws IOException {
1279    boolean reservoirEnabled = conf.getBoolean(RESERVOIR_ENABLED_KEY, true);
1280    try {
1281      return RpcServerFactory.createRpcServer(server, name, getServices(),
1282          bindAddress, // use final bindAddress for this server.
1283          conf, rpcSchedulerFactory.create(conf, this, server), reservoirEnabled);
1284    } catch (BindException be) {
1285      throw new IOException(be.getMessage() + ". To switch ports use the '"
1286          + HConstants.REGIONSERVER_PORT + "' configuration property.",
1287          be.getCause() != null ? be.getCause() : be);
1288    }
1289  }
1290
1291  protected Class<?> getRpcSchedulerFactoryClass() {
1292    return this.regionServer.conf.getClass(REGION_SERVER_RPC_SCHEDULER_FACTORY_CLASS,
1293      SimpleRpcSchedulerFactory.class);
1294  }
1295
1296  @Override
1297  public void onConfigurationChange(Configuration newConf) {
1298    if (rpcServer instanceof ConfigurationObserver) {
1299      ((ConfigurationObserver)rpcServer).onConfigurationChange(newConf);
1300    }
1301  }
1302
1303  protected PriorityFunction createPriority() {
1304    return new AnnotationReadingPriorityFunction(this);
1305  }
1306
1307  protected void requirePermission(String request, Permission.Action perm) throws IOException {
1308    if (accessChecker != null) {
1309      accessChecker.requirePermission(RpcServer.getRequestUser().orElse(null), request, null, perm);
1310    }
1311  }
1312
1313
1314  public static String getHostname(Configuration conf, boolean isMaster)
1315      throws UnknownHostException {
1316    String hostname = conf.get(isMaster? HRegionServer.MASTER_HOSTNAME_KEY :
1317      HRegionServer.RS_HOSTNAME_KEY);
1318    if (hostname == null || hostname.isEmpty()) {
1319      String masterOrRS = isMaster ? "master" : "regionserver";
1320      return Strings.domainNamePointerToHostName(DNS.getDefaultHost(
1321        conf.get("hbase." + masterOrRS + ".dns.interface", "default"),
1322        conf.get("hbase." + masterOrRS + ".dns.nameserver", "default")));
1323    } else {
1324      LOG.info("hostname is configured to be " + hostname);
1325      return hostname;
1326    }
1327  }
1328
1329  @VisibleForTesting
1330  public int getScannersCount() {
1331    return scanners.size();
1332  }
1333
1334  public
1335  RegionScanner getScanner(long scannerId) {
1336    String scannerIdString = Long.toString(scannerId);
1337    RegionScannerHolder scannerHolder = scanners.get(scannerIdString);
1338    if (scannerHolder != null) {
1339      return scannerHolder.s;
1340    }
1341    return null;
1342  }
1343
1344  public String getScanDetailsWithId(long scannerId) {
1345    RegionScanner scanner = getScanner(scannerId);
1346    if (scanner == null) {
1347      return null;
1348    }
1349    StringBuilder builder = new StringBuilder();
1350    builder.append("table: ").append(scanner.getRegionInfo().getTable().getNameAsString());
1351    builder.append(" region: ").append(scanner.getRegionInfo().getRegionNameAsString());
1352    return builder.toString();
1353  }
1354
1355  /**
1356   * Get the vtime associated with the scanner.
1357   * Currently the vtime is the number of "next" calls.
1358   */
1359  long getScannerVirtualTime(long scannerId) {
1360    String scannerIdString = Long.toString(scannerId);
1361    RegionScannerHolder scannerHolder = scanners.get(scannerIdString);
1362    if (scannerHolder != null) {
1363      return scannerHolder.getNextCallSeq();
1364    }
1365    return 0L;
1366  }
1367
1368  /**
1369   * Method to account for the size of retained cells and retained data blocks.
1370   * @param context rpc call context
1371   * @param r result to add size.
1372   * @param lastBlock last block to check whether we need to add the block size in context.
1373   * @return an object that represents the last referenced block from this response.
1374   */
1375  Object addSize(RpcCallContext context, Result r, Object lastBlock) {
1376    if (context != null && r != null && !r.isEmpty()) {
1377      for (Cell c : r.rawCells()) {
1378        context.incrementResponseCellSize(PrivateCellUtil.estimatedSerializedSizeOf(c));
1379
1380        // Since byte buffers can point all kinds of crazy places it's harder to keep track
1381        // of which blocks are kept alive by what byte buffer.
1382        // So we make a guess.
1383        if (c instanceof ByteBufferExtendedCell) {
1384          ByteBufferExtendedCell bbCell = (ByteBufferExtendedCell) c;
1385          ByteBuffer bb = bbCell.getValueByteBuffer();
1386          if (bb != lastBlock) {
1387            context.incrementResponseBlockSize(bb.capacity());
1388            lastBlock = bb;
1389          }
1390        } else {
1391          // We're using the last block being the same as the current block as
1392          // a proxy for pointing to a new block. This won't be exact.
1393          // If there are multiple gets that bounce back and forth
1394          // Then it's possible that this will over count the size of
1395          // referenced blocks. However it's better to over count and
1396          // use two rpcs than to OOME the regionserver.
1397          byte[] valueArray = c.getValueArray();
1398          if (valueArray != lastBlock) {
1399            context.incrementResponseBlockSize(valueArray.length);
1400            lastBlock = valueArray;
1401          }
1402        }
1403
1404      }
1405    }
1406    return lastBlock;
1407  }
1408
1409  private RegionScannerHolder addScanner(String scannerName, RegionScanner s, Shipper shipper,
1410      HRegion r, boolean needCursor) throws LeaseStillHeldException {
1411    Lease lease = regionServer.leases.createLease(scannerName, this.scannerLeaseTimeoutPeriod,
1412      new ScannerListener(scannerName));
1413    RpcCallback shippedCallback = new RegionScannerShippedCallBack(scannerName, shipper, lease);
1414    RpcCallback closeCallback;
1415    if (s instanceof RpcCallback) {
1416      closeCallback = (RpcCallback) s;
1417    } else {
1418      closeCallback = new RegionScannerCloseCallBack(s);
1419    }
1420    RegionScannerHolder rsh =
1421        new RegionScannerHolder(scannerName, s, r, closeCallback, shippedCallback, needCursor);
1422    RegionScannerHolder existing = scanners.putIfAbsent(scannerName, rsh);
1423    assert existing == null : "scannerId must be unique within regionserver's whole lifecycle! " +
1424      scannerName;
1425    return rsh;
1426  }
1427
1428  /**
1429   * Find the HRegion based on a region specifier
1430   *
1431   * @param regionSpecifier the region specifier
1432   * @return the corresponding region
1433   * @throws IOException if the specifier is not null,
1434   *    but failed to find the region
1435   */
1436  @VisibleForTesting
1437  public HRegion getRegion(
1438      final RegionSpecifier regionSpecifier) throws IOException {
1439    return regionServer.getRegion(regionSpecifier.getValue().toByteArray());
1440  }
1441
1442  /**
1443   * Find the List of HRegions based on a list of region specifiers
1444   *
1445   * @param regionSpecifiers the list of region specifiers
1446   * @return the corresponding list of regions
1447   * @throws IOException if any of the specifiers is not null,
1448   *    but failed to find the region
1449   */
1450  private List<HRegion> getRegions(final List<RegionSpecifier> regionSpecifiers,
1451      final CacheEvictionStatsBuilder stats) {
1452    List<HRegion> regions = Lists.newArrayListWithCapacity(regionSpecifiers.size());
1453    for (RegionSpecifier regionSpecifier: regionSpecifiers) {
1454      try {
1455        regions.add(regionServer.getRegion(regionSpecifier.getValue().toByteArray()));
1456      } catch (NotServingRegionException e) {
1457        stats.addException(regionSpecifier.getValue().toByteArray(), e);
1458      }
1459    }
1460    return regions;
1461  }
1462
1463  @VisibleForTesting
1464  public PriorityFunction getPriority() {
1465    return priority;
1466  }
1467
1468  @VisibleForTesting
1469  public Configuration getConfiguration() {
1470    return regionServer.getConfiguration();
1471  }
1472
1473  private RegionServerRpcQuotaManager getRpcQuotaManager() {
1474    return regionServer.getRegionServerRpcQuotaManager();
1475  }
1476
1477  private RegionServerSpaceQuotaManager getSpaceQuotaManager() {
1478    return regionServer.getRegionServerSpaceQuotaManager();
1479  }
1480
1481  void start(ZKWatcher zkWatcher) {
1482    if (AccessChecker.isAuthorizationSupported(getConfiguration())) {
1483      accessChecker = new AccessChecker(getConfiguration(), zkWatcher);
1484    }
1485    this.scannerIdGenerator = new ScannerIdGenerator(this.regionServer.serverName);
1486    rpcServer.start();
1487  }
1488
1489  void stop() {
1490    if (accessChecker != null) {
1491      accessChecker.stop();
1492    }
1493    closeAllScanners();
1494    rpcServer.stop();
1495  }
1496
1497  /**
1498   * Called to verify that this server is up and running.
1499   */
1500  // TODO : Rename this and HMaster#checkInitialized to isRunning() (or a better name).
1501  protected void checkOpen() throws IOException {
1502    if (regionServer.isAborted()) {
1503      throw new RegionServerAbortedException("Server " + regionServer.serverName + " aborting");
1504    }
1505    if (regionServer.isStopped()) {
1506      throw new RegionServerStoppedException("Server " + regionServer.serverName + " stopping");
1507    }
1508    if (!regionServer.fsOk) {
1509      throw new RegionServerStoppedException("File system not available");
1510    }
1511    if (!regionServer.isOnline()) {
1512      throw new ServerNotRunningYetException("Server " + regionServer.serverName
1513          + " is not running yet");
1514    }
1515  }
1516
1517  /**
1518   * By default, put up an Admin and a Client Service.
1519   * Set booleans <code>hbase.regionserver.admin.executorService</code> and
1520   * <code>hbase.regionserver.client.executorService</code> if you want to enable/disable services.
1521   * Default is that both are enabled.
1522   * @return immutable list of blocking services and the security info classes that this server
1523   * supports
1524   */
1525  protected List<BlockingServiceAndInterface> getServices() {
1526    boolean admin =
1527      getConfiguration().getBoolean(REGIONSERVER_ADMIN_SERVICE_CONFIG, true);
1528    boolean client =
1529      getConfiguration().getBoolean(REGIONSERVER_CLIENT_SERVICE_CONFIG, true);
1530    List<BlockingServiceAndInterface> bssi = new ArrayList<>();
1531    if (client) {
1532      bssi.add(new BlockingServiceAndInterface(
1533      ClientService.newReflectiveBlockingService(this),
1534      ClientService.BlockingInterface.class));
1535    }
1536    if (admin) {
1537      bssi.add(new BlockingServiceAndInterface(
1538      AdminService.newReflectiveBlockingService(this),
1539      AdminService.BlockingInterface.class));
1540    }
1541    return new org.apache.hbase.thirdparty.com.google.common.collect.
1542        ImmutableList.Builder<BlockingServiceAndInterface>().addAll(bssi).build();
1543  }
1544
1545  public InetSocketAddress getSocketAddress() {
1546    return isa;
1547  }
1548
1549  @Override
1550  public int getPriority(RequestHeader header, Message param, User user) {
1551    return priority.getPriority(header, param, user);
1552  }
1553
1554  @Override
1555  public long getDeadline(RequestHeader header, Message param) {
1556    return priority.getDeadline(header, param);
1557  }
1558
1559  /*
1560   * Check if an OOME and, if so, abort immediately to avoid creating more objects.
1561   *
1562   * @param e
1563   *
1564   * @return True if we OOME'd and are aborting.
1565   */
1566  @Override
1567  public boolean checkOOME(final Throwable e) {
1568    return exitIfOOME(e);
1569  }
1570
1571  public static boolean exitIfOOME(final Throwable e ){
1572    boolean stop = false;
1573    try {
1574      if (e instanceof OutOfMemoryError
1575          || (e.getCause() != null && e.getCause() instanceof OutOfMemoryError)
1576          || (e.getMessage() != null && e.getMessage().contains(
1577              "java.lang.OutOfMemoryError"))) {
1578        stop = true;
1579        LOG.error(HBaseMarkers.FATAL, "Run out of memory; "
1580          + RSRpcServices.class.getSimpleName() + " will abort itself immediately",
1581          e);
1582      }
1583    } finally {
1584      if (stop) {
1585        Runtime.getRuntime().halt(1);
1586      }
1587    }
1588    return stop;
1589  }
1590
1591  /**
1592   * Close a region on the region server.
1593   *
1594   * @param controller the RPC controller
1595   * @param request the request
1596   * @throws ServiceException
1597   */
1598  @Override
1599  @QosPriority(priority=HConstants.ADMIN_QOS)
1600  public CloseRegionResponse closeRegion(final RpcController controller,
1601      final CloseRegionRequest request) throws ServiceException {
1602    final ServerName sn = (request.hasDestinationServer() ?
1603      ProtobufUtil.toServerName(request.getDestinationServer()) : null);
1604
1605    try {
1606      checkOpen();
1607      throwOnWrongStartCode(request);
1608      final String encodedRegionName = ProtobufUtil.getRegionEncodedName(request.getRegion());
1609
1610      requestCount.increment();
1611      if (sn == null) {
1612        LOG.info("Close " + encodedRegionName + " without moving");
1613      } else {
1614        LOG.info("Close " + encodedRegionName + ", moving to " + sn);
1615      }
1616      boolean closed = regionServer.closeRegion(encodedRegionName, false, sn);
1617      CloseRegionResponse.Builder builder = CloseRegionResponse.newBuilder().setClosed(closed);
1618      return builder.build();
1619    } catch (IOException ie) {
1620      throw new ServiceException(ie);
1621    }
1622  }
1623
1624  /**
1625   * Compact a region on the region server.
1626   *
1627   * @param controller the RPC controller
1628   * @param request the request
1629   * @throws ServiceException
1630   */
1631  @Override
1632  @QosPriority(priority = HConstants.ADMIN_QOS)
1633  public CompactRegionResponse compactRegion(final RpcController controller,
1634      final CompactRegionRequest request) throws ServiceException {
1635    try {
1636      checkOpen();
1637      requestCount.increment();
1638      HRegion region = getRegion(request.getRegion());
1639      // Quota support is enabled, the requesting user is not system/super user
1640      // and a quota policy is enforced that disables compactions.
1641      if (QuotaUtil.isQuotaEnabled(getConfiguration()) &&
1642          !Superusers.isSuperUser(RpcServer.getRequestUser().orElse(null)) &&
1643          this.regionServer.getRegionServerSpaceQuotaManager()
1644              .areCompactionsDisabled(region.getTableDescriptor().getTableName())) {
1645        throw new DoNotRetryIOException(
1646            "Compactions on this region are " + "disabled due to a space quota violation.");
1647      }
1648      region.startRegionOperation(Operation.COMPACT_REGION);
1649      LOG.info("Compacting " + region.getRegionInfo().getRegionNameAsString());
1650      boolean major = request.hasMajor() && request.getMajor();
1651      if (request.hasFamily()) {
1652        byte[] family = request.getFamily().toByteArray();
1653        String log = "User-triggered " + (major ? "major " : "") + "compaction for region " +
1654            region.getRegionInfo().getRegionNameAsString() + " and family " +
1655            Bytes.toString(family);
1656        LOG.trace(log);
1657        region.requestCompaction(family, log, Store.PRIORITY_USER, major,
1658          CompactionLifeCycleTracker.DUMMY);
1659      } else {
1660        String log = "User-triggered " + (major ? "major " : "") + "compaction for region " +
1661            region.getRegionInfo().getRegionNameAsString();
1662        LOG.trace(log);
1663        region.requestCompaction(log, Store.PRIORITY_USER, major, CompactionLifeCycleTracker.DUMMY);
1664      }
1665      return CompactRegionResponse.newBuilder().build();
1666    } catch (IOException ie) {
1667      throw new ServiceException(ie);
1668    }
1669  }
1670
1671  @Override
1672  public CompactionSwitchResponse compactionSwitch(RpcController controller,
1673      CompactionSwitchRequest request) throws ServiceException {
1674    try {
1675      checkOpen();
1676      requestCount.increment();
1677      boolean prevState = regionServer.compactSplitThread.isCompactionsEnabled();
1678      CompactionSwitchResponse response =
1679          CompactionSwitchResponse.newBuilder().setPrevState(prevState).build();
1680      if (prevState == request.getEnabled()) {
1681        // passed in requested state is same as current state. No action required
1682        return response;
1683      }
1684      regionServer.compactSplitThread.switchCompaction(request.getEnabled());
1685      return response;
1686    } catch (IOException ie) {
1687      throw new ServiceException(ie);
1688    }
1689  }
1690
1691  /**
1692   * Flush a region on the region server.
1693   *
1694   * @param controller the RPC controller
1695   * @param request the request
1696   * @throws ServiceException
1697   */
1698  @Override
1699  @QosPriority(priority=HConstants.ADMIN_QOS)
1700  public FlushRegionResponse flushRegion(final RpcController controller,
1701      final FlushRegionRequest request) throws ServiceException {
1702    try {
1703      checkOpen();
1704      requestCount.increment();
1705      HRegion region = getRegion(request.getRegion());
1706      LOG.info("Flushing " + region.getRegionInfo().getRegionNameAsString());
1707      boolean shouldFlush = true;
1708      if (request.hasIfOlderThanTs()) {
1709        shouldFlush = region.getEarliestFlushTimeForAllStores() < request.getIfOlderThanTs();
1710      }
1711      FlushRegionResponse.Builder builder = FlushRegionResponse.newBuilder();
1712      if (shouldFlush) {
1713        boolean writeFlushWalMarker =  request.hasWriteFlushWalMarker() ?
1714            request.getWriteFlushWalMarker() : false;
1715        // Go behind the curtain so we can manage writing of the flush WAL marker
1716        HRegion.FlushResultImpl flushResult =
1717            region.flushcache(true, writeFlushWalMarker, FlushLifeCycleTracker.DUMMY);
1718        boolean compactionNeeded = flushResult.isCompactionNeeded();
1719        if (compactionNeeded) {
1720          regionServer.compactSplitThread.requestSystemCompaction(region,
1721            "Compaction through user triggered flush");
1722        }
1723        builder.setFlushed(flushResult.isFlushSucceeded());
1724        builder.setWroteFlushWalMarker(flushResult.wroteFlushWalMarker);
1725      }
1726      builder.setLastFlushTime(region.getEarliestFlushTimeForAllStores());
1727      return builder.build();
1728    } catch (DroppedSnapshotException ex) {
1729      // Cache flush can fail in a few places. If it fails in a critical
1730      // section, we get a DroppedSnapshotException and a replay of wal
1731      // is required. Currently the only way to do this is a restart of
1732      // the server.
1733      regionServer.abort("Replay of WAL required. Forcing server shutdown", ex);
1734      throw new ServiceException(ex);
1735    } catch (IOException ie) {
1736      throw new ServiceException(ie);
1737    }
1738  }
1739
1740  @Override
1741  @QosPriority(priority=HConstants.ADMIN_QOS)
1742  public GetOnlineRegionResponse getOnlineRegion(final RpcController controller,
1743      final GetOnlineRegionRequest request) throws ServiceException {
1744    try {
1745      checkOpen();
1746      requestCount.increment();
1747      Map<String, HRegion> onlineRegions = regionServer.onlineRegions;
1748      List<RegionInfo> list = new ArrayList<>(onlineRegions.size());
1749      for (HRegion region: onlineRegions.values()) {
1750        list.add(region.getRegionInfo());
1751      }
1752      Collections.sort(list, RegionInfo.COMPARATOR);
1753      return ResponseConverter.buildGetOnlineRegionResponse(list);
1754    } catch (IOException ie) {
1755      throw new ServiceException(ie);
1756    }
1757  }
1758
1759  // Master implementation of this Admin Service differs given it is not
1760  // able to supply detail only known to RegionServer. See note on
1761  // MasterRpcServers#getRegionInfo.
1762  @Override
1763  @QosPriority(priority=HConstants.ADMIN_QOS)
1764  public GetRegionInfoResponse getRegionInfo(final RpcController controller,
1765      final GetRegionInfoRequest request) throws ServiceException {
1766    try {
1767      checkOpen();
1768      requestCount.increment();
1769      HRegion region = getRegion(request.getRegion());
1770      RegionInfo info = region.getRegionInfo();
1771      byte[] bestSplitRow = null;
1772      boolean shouldSplit = true;
1773      if (request.hasBestSplitRow() && request.getBestSplitRow()) {
1774        HRegion r = region;
1775        region.startRegionOperation(Operation.SPLIT_REGION);
1776        r.forceSplit(null);
1777        // Even after setting force split if split policy says no to split then we should not split.
1778        shouldSplit = region.getSplitPolicy().shouldSplit() && !info.isMetaRegion();
1779        bestSplitRow = r.checkSplit();
1780        // when all table data are in memstore, bestSplitRow = null
1781        // try to flush region first
1782        if(bestSplitRow == null) {
1783          r.flush(true);
1784          bestSplitRow = r.checkSplit();
1785        }
1786        r.clearSplit();
1787      }
1788      GetRegionInfoResponse.Builder builder = GetRegionInfoResponse.newBuilder();
1789      builder.setRegionInfo(ProtobufUtil.toRegionInfo(info));
1790      if (request.hasCompactionState() && request.getCompactionState()) {
1791        builder.setCompactionState(ProtobufUtil.createCompactionState(region.getCompactionState()));
1792      }
1793      builder.setSplittable(region.isSplittable() && shouldSplit);
1794      builder.setMergeable(region.isMergeable());
1795      if (request.hasBestSplitRow() && request.getBestSplitRow() && bestSplitRow != null) {
1796        builder.setBestSplitRow(UnsafeByteOperations.unsafeWrap(bestSplitRow));
1797      }
1798      return builder.build();
1799    } catch (IOException ie) {
1800      throw new ServiceException(ie);
1801    }
1802  }
1803
1804  @Override
1805  @QosPriority(priority=HConstants.ADMIN_QOS)
1806  public GetRegionLoadResponse getRegionLoad(RpcController controller,
1807      GetRegionLoadRequest request) throws ServiceException {
1808
1809    List<HRegion> regions;
1810    if (request.hasTableName()) {
1811      TableName tableName = ProtobufUtil.toTableName(request.getTableName());
1812      regions = regionServer.getRegions(tableName);
1813    } else {
1814      regions = regionServer.getRegions();
1815    }
1816    List<RegionLoad> rLoads = new ArrayList<>(regions.size());
1817    RegionLoad.Builder regionLoadBuilder = ClusterStatusProtos.RegionLoad.newBuilder();
1818    RegionSpecifier.Builder regionSpecifier = RegionSpecifier.newBuilder();
1819
1820    try {
1821      for (HRegion region : regions) {
1822        rLoads.add(regionServer.createRegionLoad(region, regionLoadBuilder, regionSpecifier));
1823      }
1824    } catch (IOException e) {
1825      throw new ServiceException(e);
1826    }
1827    GetRegionLoadResponse.Builder builder = GetRegionLoadResponse.newBuilder();
1828    builder.addAllRegionLoads(rLoads);
1829    return builder.build();
1830  }
1831
1832  @Override
1833  @QosPriority(priority=HConstants.ADMIN_QOS)
1834  public ClearCompactionQueuesResponse clearCompactionQueues(RpcController controller,
1835    ClearCompactionQueuesRequest request) throws ServiceException {
1836    LOG.debug("Client=" + RpcServer.getRequestUserName().orElse(null) + "/"
1837        + RpcServer.getRemoteAddress().orElse(null) + " clear compactions queue");
1838    ClearCompactionQueuesResponse.Builder respBuilder = ClearCompactionQueuesResponse.newBuilder();
1839    requestCount.increment();
1840    if (clearCompactionQueues.compareAndSet(false,true)) {
1841      try {
1842        checkOpen();
1843        regionServer.getRegionServerCoprocessorHost().preClearCompactionQueues();
1844        for (String queueName : request.getQueueNameList()) {
1845          LOG.debug("clear " + queueName + " compaction queue");
1846          switch (queueName) {
1847            case "long":
1848              regionServer.compactSplitThread.clearLongCompactionsQueue();
1849              break;
1850            case "short":
1851              regionServer.compactSplitThread.clearShortCompactionsQueue();
1852              break;
1853            default:
1854              LOG.warn("Unknown queue name " + queueName);
1855              throw new IOException("Unknown queue name " + queueName);
1856          }
1857        }
1858        regionServer.getRegionServerCoprocessorHost().postClearCompactionQueues();
1859      } catch (IOException ie) {
1860        throw new ServiceException(ie);
1861      } finally {
1862        clearCompactionQueues.set(false);
1863      }
1864    } else {
1865      LOG.warn("Clear compactions queue is executing by other admin.");
1866    }
1867    return respBuilder.build();
1868  }
1869
1870  /**
1871   * Get some information of the region server.
1872   *
1873   * @param controller the RPC controller
1874   * @param request the request
1875   * @throws ServiceException
1876   */
1877  @Override
1878  @QosPriority(priority=HConstants.ADMIN_QOS)
1879  public GetServerInfoResponse getServerInfo(final RpcController controller,
1880      final GetServerInfoRequest request) throws ServiceException {
1881    try {
1882      checkOpen();
1883    } catch (IOException ie) {
1884      throw new ServiceException(ie);
1885    }
1886    requestCount.increment();
1887    int infoPort = regionServer.infoServer != null ? regionServer.infoServer.getPort() : -1;
1888    return ResponseConverter.buildGetServerInfoResponse(regionServer.serverName, infoPort);
1889  }
1890
1891  @Override
1892  @QosPriority(priority=HConstants.ADMIN_QOS)
1893  public GetStoreFileResponse getStoreFile(final RpcController controller,
1894      final GetStoreFileRequest request) throws ServiceException {
1895    try {
1896      checkOpen();
1897      HRegion region = getRegion(request.getRegion());
1898      requestCount.increment();
1899      Set<byte[]> columnFamilies;
1900      if (request.getFamilyCount() == 0) {
1901        columnFamilies = region.getTableDescriptor().getColumnFamilyNames();
1902      } else {
1903        columnFamilies = new TreeSet<>(Bytes.BYTES_RAWCOMPARATOR);
1904        for (ByteString cf: request.getFamilyList()) {
1905          columnFamilies.add(cf.toByteArray());
1906        }
1907      }
1908      int nCF = columnFamilies.size();
1909      List<String>  fileList = region.getStoreFileList(
1910        columnFamilies.toArray(new byte[nCF][]));
1911      GetStoreFileResponse.Builder builder = GetStoreFileResponse.newBuilder();
1912      builder.addAllStoreFile(fileList);
1913      return builder.build();
1914    } catch (IOException ie) {
1915      throw new ServiceException(ie);
1916    }
1917  }
1918
1919  private void throwOnWrongStartCode(OpenRegionRequest request) throws ServiceException {
1920    if (!request.hasServerStartCode()) {
1921      LOG.warn("OpenRegionRequest for {} does not have a start code", request.getOpenInfoList());
1922      return;
1923    }
1924    throwOnWrongStartCode(request.getServerStartCode());
1925  }
1926
1927  private void throwOnWrongStartCode(CloseRegionRequest request) throws ServiceException {
1928    if (!request.hasServerStartCode()) {
1929      LOG.warn("CloseRegionRequest for {} does not have a start code", request.getRegion());
1930      return;
1931    }
1932    throwOnWrongStartCode(request.getServerStartCode());
1933  }
1934
1935  private void throwOnWrongStartCode(long serverStartCode) throws ServiceException {
1936    // check that we are the same server that this RPC is intended for.
1937    if (regionServer.serverName.getStartcode() != serverStartCode) {
1938      throw new ServiceException(new DoNotRetryIOException(
1939        "This RPC was intended for a " + "different server with startCode: " + serverStartCode +
1940          ", this server is: " + regionServer.serverName));
1941    }
1942  }
1943
1944  private void throwOnWrongStartCode(ExecuteProceduresRequest req) throws ServiceException {
1945    if (req.getOpenRegionCount() > 0) {
1946      for (OpenRegionRequest openReq : req.getOpenRegionList()) {
1947        throwOnWrongStartCode(openReq);
1948      }
1949    }
1950    if (req.getCloseRegionCount() > 0) {
1951      for (CloseRegionRequest closeReq : req.getCloseRegionList()) {
1952        throwOnWrongStartCode(closeReq);
1953      }
1954    }
1955  }
1956
1957  /**
1958   * Open asynchronously a region or a set of regions on the region server.
1959   *
1960   * The opening is coordinated by ZooKeeper, and this method requires the znode to be created
1961   *  before being called. As a consequence, this method should be called only from the master.
1962   * <p>
1963   * Different manages states for the region are:
1964   * </p><ul>
1965   *  <li>region not opened: the region opening will start asynchronously.</li>
1966   *  <li>a close is already in progress: this is considered as an error.</li>
1967   *  <li>an open is already in progress: this new open request will be ignored. This is important
1968   *  because the Master can do multiple requests if it crashes.</li>
1969   *  <li>the region is already opened:  this new open request will be ignored.</li>
1970   *  </ul>
1971   * <p>
1972   * Bulk assign: If there are more than 1 region to open, it will be considered as a bulk assign.
1973   * For a single region opening, errors are sent through a ServiceException. For bulk assign,
1974   * errors are put in the response as FAILED_OPENING.
1975   * </p>
1976   * @param controller the RPC controller
1977   * @param request the request
1978   * @throws ServiceException
1979   */
1980  @Override
1981  @QosPriority(priority=HConstants.ADMIN_QOS)
1982  public OpenRegionResponse openRegion(final RpcController controller,
1983      final OpenRegionRequest request) throws ServiceException {
1984    requestCount.increment();
1985    throwOnWrongStartCode(request);
1986
1987    OpenRegionResponse.Builder builder = OpenRegionResponse.newBuilder();
1988    final int regionCount = request.getOpenInfoCount();
1989    final Map<TableName, TableDescriptor> htds = new HashMap<>(regionCount);
1990    final boolean isBulkAssign = regionCount > 1;
1991    try {
1992      checkOpen();
1993    } catch (IOException ie) {
1994      TableName tableName = null;
1995      if (regionCount == 1) {
1996        org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionInfo ri = request.getOpenInfo(0).getRegion();
1997        if (ri != null) {
1998          tableName = ProtobufUtil.toTableName(ri.getTableName());
1999        }
2000      }
2001      if (!TableName.META_TABLE_NAME.equals(tableName)) {
2002        throw new ServiceException(ie);
2003      }
2004      // We are assigning meta, wait a little for regionserver to finish initialization.
2005      int timeout = regionServer.conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY,
2006        HConstants.DEFAULT_HBASE_RPC_TIMEOUT) >> 2; // Quarter of RPC timeout
2007      long endTime = System.currentTimeMillis() + timeout;
2008      synchronized (regionServer.online) {
2009        try {
2010          while (System.currentTimeMillis() <= endTime
2011              && !regionServer.isStopped() && !regionServer.isOnline()) {
2012            regionServer.online.wait(regionServer.msgInterval);
2013          }
2014          checkOpen();
2015        } catch (InterruptedException t) {
2016          Thread.currentThread().interrupt();
2017          throw new ServiceException(t);
2018        } catch (IOException e) {
2019          throw new ServiceException(e);
2020        }
2021      }
2022    }
2023
2024    long masterSystemTime = request.hasMasterSystemTime() ? request.getMasterSystemTime() : -1;
2025
2026    for (RegionOpenInfo regionOpenInfo : request.getOpenInfoList()) {
2027      final RegionInfo region = ProtobufUtil.toRegionInfo(regionOpenInfo.getRegion());
2028      TableDescriptor htd;
2029      try {
2030        String encodedName = region.getEncodedName();
2031        byte[] encodedNameBytes = region.getEncodedNameAsBytes();
2032        final HRegion onlineRegion = regionServer.getRegion(encodedName);
2033        if (onlineRegion != null) {
2034          // The region is already online. This should not happen any more.
2035          String error = "Received OPEN for the region:"
2036            + region.getRegionNameAsString() + ", which is already online";
2037          LOG.warn(error);
2038          //regionServer.abort(error);
2039          //throw new IOException(error);
2040          builder.addOpeningState(RegionOpeningState.OPENED);
2041          continue;
2042        }
2043        LOG.info("Open " + region.getRegionNameAsString());
2044
2045        final Boolean previous = regionServer.regionsInTransitionInRS.putIfAbsent(
2046          encodedNameBytes, Boolean.TRUE);
2047
2048        if (Boolean.FALSE.equals(previous)) {
2049          if (regionServer.getRegion(encodedName) != null) {
2050            // There is a close in progress. This should not happen any more.
2051            String error = "Received OPEN for the region:"
2052              + region.getRegionNameAsString() + ", which we are already trying to CLOSE";
2053            regionServer.abort(error);
2054            throw new IOException(error);
2055          }
2056          regionServer.regionsInTransitionInRS.put(encodedNameBytes, Boolean.TRUE);
2057        }
2058
2059        if (Boolean.TRUE.equals(previous)) {
2060          // An open is in progress. This is supported, but let's log this.
2061          LOG.info("Receiving OPEN for the region:" +
2062            region.getRegionNameAsString() + ", which we are already trying to OPEN"
2063              + " - ignoring this new request for this region.");
2064        }
2065
2066        // We are opening this region. If it moves back and forth for whatever reason, we don't
2067        // want to keep returning the stale moved record while we are opening/if we close again.
2068        regionServer.removeFromMovedRegions(region.getEncodedName());
2069
2070        if (previous == null || !previous.booleanValue()) {
2071          htd = htds.get(region.getTable());
2072          if (htd == null) {
2073            htd = regionServer.tableDescriptors.get(region.getTable());
2074            htds.put(region.getTable(), htd);
2075          }
2076          if (htd == null) {
2077            throw new IOException("Missing table descriptor for " + region.getEncodedName());
2078          }
2079          // If there is no action in progress, we can submit a specific handler.
2080          // Need to pass the expected version in the constructor.
2081          if (regionServer.executorService == null) {
2082            LOG.info("No executor executorService; skipping open request");
2083          } else {
2084            if (region.isMetaRegion()) {
2085              regionServer.executorService.submit(new OpenMetaHandler(
2086              regionServer, regionServer, region, htd, masterSystemTime));
2087            } else {
2088              if (regionOpenInfo.getFavoredNodesCount() > 0) {
2089                regionServer.updateRegionFavoredNodesMapping(region.getEncodedName(),
2090                regionOpenInfo.getFavoredNodesList());
2091              }
2092              if (htd.getPriority() >= HConstants.ADMIN_QOS || region.getTable().isSystemTable()) {
2093                regionServer.executorService.submit(new OpenPriorityRegionHandler(
2094                regionServer, regionServer, region, htd, masterSystemTime));
2095              } else {
2096                regionServer.executorService.submit(new OpenRegionHandler(
2097                regionServer, regionServer, region, htd, masterSystemTime));
2098              }
2099            }
2100          }
2101        }
2102
2103        builder.addOpeningState(RegionOpeningState.OPENED);
2104      } catch (IOException ie) {
2105        LOG.warn("Failed opening region " + region.getRegionNameAsString(), ie);
2106        if (isBulkAssign) {
2107          builder.addOpeningState(RegionOpeningState.FAILED_OPENING);
2108        } else {
2109          throw new ServiceException(ie);
2110        }
2111      }
2112    }
2113    return builder.build();
2114  }
2115
2116  /**
2117   *  Wamrmup a region on this server.
2118   *
2119   * This method should only be called by Master. It synchrnously opens the region and
2120   * closes the region bringing the most important pages in cache.
2121   * <p>
2122   *
2123   * @param controller the RPC controller
2124   * @param request the request
2125   * @throws ServiceException
2126   */
2127  @Override
2128  public WarmupRegionResponse warmupRegion(final RpcController controller,
2129      final WarmupRegionRequest request) throws ServiceException {
2130
2131    final RegionInfo region = ProtobufUtil.toRegionInfo(request.getRegionInfo());
2132    TableDescriptor htd;
2133    WarmupRegionResponse response = WarmupRegionResponse.getDefaultInstance();
2134
2135    try {
2136      checkOpen();
2137      String encodedName = region.getEncodedName();
2138      byte[] encodedNameBytes = region.getEncodedNameAsBytes();
2139      final HRegion onlineRegion = regionServer.getRegion(encodedName);
2140
2141      if (onlineRegion != null) {
2142        LOG.info("Region already online. Skipping warming up " + region);
2143        return response;
2144      }
2145
2146      htd = regionServer.tableDescriptors.get(region.getTable());
2147
2148      if (regionServer.getRegionsInTransitionInRS().containsKey(encodedNameBytes)) {
2149        LOG.info("Region is in transition. Skipping warmup " + region);
2150        return response;
2151      }
2152
2153      LOG.info("Warming up region " + region.getRegionNameAsString());
2154      HRegion.warmupHRegion(region, htd, regionServer.getWAL(region),
2155          regionServer.getConfiguration(), regionServer, null);
2156
2157    } catch (IOException ie) {
2158      LOG.error("Failed warming up region " + region.getRegionNameAsString(), ie);
2159      throw new ServiceException(ie);
2160    }
2161
2162    return response;
2163  }
2164
2165  /**
2166   * Replay the given changes when distributedLogReplay WAL edits from a failed RS. The guarantee is
2167   * that the given mutations will be durable on the receiving RS if this method returns without any
2168   * exception.
2169   * @param controller the RPC controller
2170   * @param request the request
2171   * @throws ServiceException
2172   */
2173  @Override
2174  @QosPriority(priority = HConstants.REPLAY_QOS)
2175  public ReplicateWALEntryResponse replay(final RpcController controller,
2176      final ReplicateWALEntryRequest request) throws ServiceException {
2177    long before = EnvironmentEdgeManager.currentTime();
2178    CellScanner cells = ((HBaseRpcController) controller).cellScanner();
2179    try {
2180      checkOpen();
2181      List<WALEntry> entries = request.getEntryList();
2182      if (entries == null || entries.isEmpty()) {
2183        // empty input
2184        return ReplicateWALEntryResponse.newBuilder().build();
2185      }
2186      ByteString regionName = entries.get(0).getKey().getEncodedRegionName();
2187      HRegion region = regionServer.getRegionByEncodedName(regionName.toStringUtf8());
2188      RegionCoprocessorHost coprocessorHost =
2189          ServerRegionReplicaUtil.isDefaultReplica(region.getRegionInfo())
2190            ? region.getCoprocessorHost()
2191            : null; // do not invoke coprocessors if this is a secondary region replica
2192      List<Pair<WALKey, WALEdit>> walEntries = new ArrayList<>();
2193
2194      // Skip adding the edits to WAL if this is a secondary region replica
2195      boolean isPrimary = RegionReplicaUtil.isDefaultReplica(region.getRegionInfo());
2196      Durability durability = isPrimary ? Durability.USE_DEFAULT : Durability.SKIP_WAL;
2197
2198      for (WALEntry entry : entries) {
2199        if (!regionName.equals(entry.getKey().getEncodedRegionName())) {
2200          throw new NotServingRegionException("Replay request contains entries from multiple " +
2201              "regions. First region:" + regionName.toStringUtf8() + " , other region:"
2202              + entry.getKey().getEncodedRegionName());
2203        }
2204        if (regionServer.nonceManager != null && isPrimary) {
2205          long nonceGroup = entry.getKey().hasNonceGroup()
2206            ? entry.getKey().getNonceGroup() : HConstants.NO_NONCE;
2207          long nonce = entry.getKey().hasNonce() ? entry.getKey().getNonce() : HConstants.NO_NONCE;
2208          regionServer.nonceManager.reportOperationFromWal(
2209              nonceGroup,
2210              nonce,
2211              entry.getKey().getWriteTime());
2212        }
2213        Pair<WALKey, WALEdit> walEntry = (coprocessorHost == null) ? null : new Pair<>();
2214        List<MutationReplay> edits = WALSplitUtil.getMutationsFromWALEntry(entry,
2215          cells, walEntry, durability);
2216        if (coprocessorHost != null) {
2217          // Start coprocessor replay here. The coprocessor is for each WALEdit instead of a
2218          // KeyValue.
2219          if (coprocessorHost.preWALRestore(region.getRegionInfo(), walEntry.getFirst(),
2220            walEntry.getSecond())) {
2221            // if bypass this log entry, ignore it ...
2222            continue;
2223          }
2224          walEntries.add(walEntry);
2225        }
2226        if(edits!=null && !edits.isEmpty()) {
2227          // HBASE-17924
2228          // sort to improve lock efficiency
2229          Collections.sort(edits, (v1, v2) -> Row.COMPARATOR.compare(v1.mutation, v2.mutation));
2230          long replaySeqId = (entry.getKey().hasOrigSequenceNumber()) ?
2231            entry.getKey().getOrigSequenceNumber() : entry.getKey().getLogSequenceNumber();
2232          OperationStatus[] result = doReplayBatchOp(region, edits, replaySeqId);
2233          // check if it's a partial success
2234          for (int i = 0; result != null && i < result.length; i++) {
2235            if (result[i] != OperationStatus.SUCCESS) {
2236              throw new IOException(result[i].getExceptionMsg());
2237            }
2238          }
2239        }
2240      }
2241
2242      //sync wal at the end because ASYNC_WAL is used above
2243      WAL wal = region.getWAL();
2244      if (wal != null) {
2245        wal.sync();
2246      }
2247
2248      if (coprocessorHost != null) {
2249        for (Pair<WALKey, WALEdit> entry : walEntries) {
2250          coprocessorHost.postWALRestore(region.getRegionInfo(), entry.getFirst(),
2251            entry.getSecond());
2252        }
2253      }
2254      return ReplicateWALEntryResponse.newBuilder().build();
2255    } catch (IOException ie) {
2256      throw new ServiceException(ie);
2257    } finally {
2258      if (regionServer.metricsRegionServer != null) {
2259        regionServer.metricsRegionServer.updateReplay(
2260          EnvironmentEdgeManager.currentTime() - before);
2261      }
2262    }
2263  }
2264
2265  /**
2266   * Replicate WAL entries on the region server.
2267   *
2268   * @param controller the RPC controller
2269   * @param request the request
2270   * @throws ServiceException
2271   */
2272  @Override
2273  @QosPriority(priority=HConstants.REPLICATION_QOS)
2274  public ReplicateWALEntryResponse replicateWALEntry(final RpcController controller,
2275      final ReplicateWALEntryRequest request) throws ServiceException {
2276    try {
2277      checkOpen();
2278      if (regionServer.replicationSinkHandler != null) {
2279        requestCount.increment();
2280        List<WALEntry> entries = request.getEntryList();
2281        CellScanner cellScanner = ((HBaseRpcController)controller).cellScanner();
2282        regionServer.getRegionServerCoprocessorHost().preReplicateLogEntries();
2283        regionServer.replicationSinkHandler.replicateLogEntries(entries, cellScanner,
2284          request.getReplicationClusterId(), request.getSourceBaseNamespaceDirPath(),
2285          request.getSourceHFileArchiveDirPath());
2286        regionServer.getRegionServerCoprocessorHost().postReplicateLogEntries();
2287        return ReplicateWALEntryResponse.newBuilder().build();
2288      } else {
2289        throw new ServiceException("Replication services are not initialized yet");
2290      }
2291    } catch (IOException ie) {
2292      throw new ServiceException(ie);
2293    }
2294  }
2295
2296  /**
2297   * Roll the WAL writer of the region server.
2298   * @param controller the RPC controller
2299   * @param request the request
2300   * @throws ServiceException
2301   */
2302  @Override
2303  public RollWALWriterResponse rollWALWriter(final RpcController controller,
2304      final RollWALWriterRequest request) throws ServiceException {
2305    try {
2306      checkOpen();
2307      requestCount.increment();
2308      regionServer.getRegionServerCoprocessorHost().preRollWALWriterRequest();
2309      regionServer.walRoller.requestRollAll();
2310      regionServer.getRegionServerCoprocessorHost().postRollWALWriterRequest();
2311      RollWALWriterResponse.Builder builder = RollWALWriterResponse.newBuilder();
2312      return builder.build();
2313    } catch (IOException ie) {
2314      throw new ServiceException(ie);
2315    }
2316  }
2317
2318
2319  /**
2320   * Stop the region server.
2321   *
2322   * @param controller the RPC controller
2323   * @param request the request
2324   * @throws ServiceException
2325   */
2326  @Override
2327  @QosPriority(priority=HConstants.ADMIN_QOS)
2328  public StopServerResponse stopServer(final RpcController controller,
2329      final StopServerRequest request) throws ServiceException {
2330    requestCount.increment();
2331    String reason = request.getReason();
2332    regionServer.stop(reason);
2333    return StopServerResponse.newBuilder().build();
2334  }
2335
2336  @Override
2337  public UpdateFavoredNodesResponse updateFavoredNodes(RpcController controller,
2338      UpdateFavoredNodesRequest request) throws ServiceException {
2339    List<UpdateFavoredNodesRequest.RegionUpdateInfo> openInfoList = request.getUpdateInfoList();
2340    UpdateFavoredNodesResponse.Builder respBuilder = UpdateFavoredNodesResponse.newBuilder();
2341    for (UpdateFavoredNodesRequest.RegionUpdateInfo regionUpdateInfo : openInfoList) {
2342      RegionInfo hri = ProtobufUtil.toRegionInfo(regionUpdateInfo.getRegion());
2343      if (regionUpdateInfo.getFavoredNodesCount() > 0) {
2344        regionServer.updateRegionFavoredNodesMapping(hri.getEncodedName(),
2345          regionUpdateInfo.getFavoredNodesList());
2346      }
2347    }
2348    respBuilder.setResponse(openInfoList.size());
2349    return respBuilder.build();
2350  }
2351
2352  /**
2353   * Atomically bulk load several HFiles into an open region
2354   * @return true if successful, false is failed but recoverably (no action)
2355   * @throws ServiceException if failed unrecoverably
2356   */
2357  @Override
2358  public BulkLoadHFileResponse bulkLoadHFile(final RpcController controller,
2359      final BulkLoadHFileRequest request) throws ServiceException {
2360    long start = EnvironmentEdgeManager.currentTime();
2361    List<String> clusterIds = new ArrayList<>(request.getClusterIdsList());
2362    if(clusterIds.contains(this.regionServer.clusterId)){
2363      return BulkLoadHFileResponse.newBuilder().setLoaded(true).build();
2364    } else {
2365      clusterIds.add(this.regionServer.clusterId);
2366    }
2367    try {
2368      checkOpen();
2369      requestCount.increment();
2370      HRegion region = getRegion(request.getRegion());
2371      Map<byte[], List<Path>> map = null;
2372      final boolean spaceQuotaEnabled = QuotaUtil.isQuotaEnabled(getConfiguration());
2373      long sizeToBeLoaded = -1;
2374
2375      // Check to see if this bulk load would exceed the space quota for this table
2376      if (spaceQuotaEnabled) {
2377        ActivePolicyEnforcement activeSpaceQuotas = getSpaceQuotaManager().getActiveEnforcements();
2378        SpaceViolationPolicyEnforcement enforcement = activeSpaceQuotas.getPolicyEnforcement(
2379            region);
2380        if (enforcement != null) {
2381          // Bulk loads must still be atomic. We must enact all or none.
2382          List<String> filePaths = new ArrayList<>(request.getFamilyPathCount());
2383          for (FamilyPath familyPath : request.getFamilyPathList()) {
2384            filePaths.add(familyPath.getPath());
2385          }
2386          // Check if the batch of files exceeds the current quota
2387          sizeToBeLoaded = enforcement.computeBulkLoadSize(regionServer.getFileSystem(), filePaths);
2388        }
2389      }
2390
2391      List<Pair<byte[], String>> familyPaths = new ArrayList<>(request.getFamilyPathCount());
2392      for (FamilyPath familyPath : request.getFamilyPathList()) {
2393        familyPaths.add(new Pair<>(familyPath.getFamily().toByteArray(), familyPath.getPath()));
2394      }
2395      if (!request.hasBulkToken()) {
2396        if (region.getCoprocessorHost() != null) {
2397          region.getCoprocessorHost().preBulkLoadHFile(familyPaths);
2398        }
2399        try {
2400          map = region.bulkLoadHFiles(familyPaths, request.getAssignSeqNum(), null,
2401              request.getCopyFile(), clusterIds, request.getReplicate());
2402        } finally {
2403          if (region.getCoprocessorHost() != null) {
2404            region.getCoprocessorHost().postBulkLoadHFile(familyPaths, map);
2405          }
2406        }
2407      } else {
2408        // secure bulk load
2409        map = regionServer.secureBulkLoadManager.secureBulkLoadHFiles(region, request, clusterIds);
2410      }
2411      BulkLoadHFileResponse.Builder builder = BulkLoadHFileResponse.newBuilder();
2412      builder.setLoaded(map != null);
2413      if (map != null) {
2414        // Treat any negative size as a flag to "ignore" updating the region size as that is
2415        // not possible to occur in real life (cannot bulk load a file with negative size)
2416        if (spaceQuotaEnabled && sizeToBeLoaded > 0) {
2417          if (LOG.isTraceEnabled()) {
2418            LOG.trace("Incrementing space use of " + region.getRegionInfo() + " by "
2419                + sizeToBeLoaded + " bytes");
2420          }
2421          // Inform space quotas of the new files for this region
2422          getSpaceQuotaManager().getRegionSizeStore().incrementRegionSize(
2423              region.getRegionInfo(), sizeToBeLoaded);
2424        }
2425      }
2426      return builder.build();
2427    } catch (IOException ie) {
2428      throw new ServiceException(ie);
2429    } finally {
2430      if (regionServer.metricsRegionServer != null) {
2431        regionServer.metricsRegionServer.updateBulkLoad(
2432            EnvironmentEdgeManager.currentTime() - start);
2433      }
2434    }
2435  }
2436
2437  @Override
2438  public PrepareBulkLoadResponse prepareBulkLoad(RpcController controller,
2439      PrepareBulkLoadRequest request) throws ServiceException {
2440    try {
2441      checkOpen();
2442      requestCount.increment();
2443
2444      HRegion region = getRegion(request.getRegion());
2445
2446      String bulkToken = regionServer.secureBulkLoadManager.prepareBulkLoad(region, request);
2447      PrepareBulkLoadResponse.Builder builder = PrepareBulkLoadResponse.newBuilder();
2448      builder.setBulkToken(bulkToken);
2449      return builder.build();
2450    } catch (IOException ie) {
2451      throw new ServiceException(ie);
2452    }
2453  }
2454
2455  @Override
2456  public CleanupBulkLoadResponse cleanupBulkLoad(RpcController controller,
2457      CleanupBulkLoadRequest request) throws ServiceException {
2458    try {
2459      checkOpen();
2460      requestCount.increment();
2461
2462      HRegion region = getRegion(request.getRegion());
2463
2464      regionServer.secureBulkLoadManager.cleanupBulkLoad(region, request);
2465      CleanupBulkLoadResponse response = CleanupBulkLoadResponse.newBuilder().build();
2466      return response;
2467    } catch (IOException ie) {
2468      throw new ServiceException(ie);
2469    }
2470  }
2471
2472  @Override
2473  public CoprocessorServiceResponse execService(final RpcController controller,
2474      final CoprocessorServiceRequest request) throws ServiceException {
2475    try {
2476      checkOpen();
2477      requestCount.increment();
2478      HRegion region = getRegion(request.getRegion());
2479      com.google.protobuf.Message result = execServiceOnRegion(region, request.getCall());
2480      CoprocessorServiceResponse.Builder builder = CoprocessorServiceResponse.newBuilder();
2481      builder.setRegion(RequestConverter.buildRegionSpecifier(
2482        RegionSpecifierType.REGION_NAME, region.getRegionInfo().getRegionName()));
2483      // TODO: COPIES!!!!!!
2484      builder.setValue(builder.getValueBuilder().setName(result.getClass().getName()).
2485        setValue(org.apache.hbase.thirdparty.com.google.protobuf.ByteString.
2486            copyFrom(result.toByteArray())));
2487      return builder.build();
2488    } catch (IOException ie) {
2489      throw new ServiceException(ie);
2490    }
2491  }
2492
2493  private com.google.protobuf.Message execServiceOnRegion(HRegion region,
2494      final ClientProtos.CoprocessorServiceCall serviceCall) throws IOException {
2495    // ignore the passed in controller (from the serialized call)
2496    ServerRpcController execController = new ServerRpcController();
2497    return region.execService(execController, serviceCall);
2498  }
2499
2500  /**
2501   * Get data from a table.
2502   *
2503   * @param controller the RPC controller
2504   * @param request the get request
2505   * @throws ServiceException
2506   */
2507  @Override
2508  public GetResponse get(final RpcController controller,
2509      final GetRequest request) throws ServiceException {
2510    long before = EnvironmentEdgeManager.currentTime();
2511    OperationQuota quota = null;
2512    HRegion region = null;
2513    try {
2514      checkOpen();
2515      requestCount.increment();
2516      rpcGetRequestCount.increment();
2517      region = getRegion(request.getRegion());
2518
2519      GetResponse.Builder builder = GetResponse.newBuilder();
2520      ClientProtos.Get get = request.getGet();
2521      // An asynchbase client, https://github.com/OpenTSDB/asynchbase, starts by trying to do
2522      // a get closest before. Throwing the UnknownProtocolException signals it that it needs
2523      // to switch and do hbase2 protocol (HBase servers do not tell clients what versions
2524      // they are; its a problem for non-native clients like asynchbase. HBASE-20225.
2525      if (get.hasClosestRowBefore() && get.getClosestRowBefore()) {
2526        throw new UnknownProtocolException("Is this a pre-hbase-1.0.0 or asynchbase client? " +
2527            "Client is invoking getClosestRowBefore removed in hbase-2.0.0 replaced by " +
2528            "reverse Scan.");
2529      }
2530      Boolean existence = null;
2531      Result r = null;
2532      RpcCallContext context = RpcServer.getCurrentCall().orElse(null);
2533      quota = getRpcQuotaManager().checkQuota(region, OperationQuota.OperationType.GET);
2534
2535      Get clientGet = ProtobufUtil.toGet(get);
2536      if (get.getExistenceOnly() && region.getCoprocessorHost() != null) {
2537        existence = region.getCoprocessorHost().preExists(clientGet);
2538      }
2539      if (existence == null) {
2540        if (context != null) {
2541          r = get(clientGet, (region), null, context);
2542        } else {
2543          // for test purpose
2544          r = region.get(clientGet);
2545        }
2546        if (get.getExistenceOnly()) {
2547          boolean exists = r.getExists();
2548          if (region.getCoprocessorHost() != null) {
2549            exists = region.getCoprocessorHost().postExists(clientGet, exists);
2550          }
2551          existence = exists;
2552        }
2553      }
2554      if (existence != null) {
2555        ClientProtos.Result pbr =
2556            ProtobufUtil.toResult(existence, region.getRegionInfo().getReplicaId() != 0);
2557        builder.setResult(pbr);
2558      } else if (r != null) {
2559        ClientProtos.Result pbr;
2560        if (isClientCellBlockSupport(context) && controller instanceof HBaseRpcController
2561            && VersionInfoUtil.hasMinimumVersion(context.getClientVersionInfo(), 1, 3)) {
2562          pbr = ProtobufUtil.toResultNoData(r);
2563          ((HBaseRpcController) controller).setCellScanner(CellUtil.createCellScanner(r
2564              .rawCells()));
2565          addSize(context, r, null);
2566        } else {
2567          pbr = ProtobufUtil.toResult(r);
2568        }
2569        builder.setResult(pbr);
2570      }
2571      //r.cells is null when an table.exists(get) call
2572      if (r != null && r.rawCells() != null) {
2573        quota.addGetResult(r);
2574      }
2575      return builder.build();
2576    } catch (IOException ie) {
2577      throw new ServiceException(ie);
2578    } finally {
2579      MetricsRegionServer mrs = regionServer.metricsRegionServer;
2580      if (mrs != null) {
2581        TableDescriptor td = region != null? region.getTableDescriptor(): null;
2582        if (td != null) {
2583          mrs.updateGet(td.getTableName(), EnvironmentEdgeManager.currentTime() - before);
2584        }
2585      }
2586      if (quota != null) {
2587        quota.close();
2588      }
2589    }
2590  }
2591
2592  private Result get(Get get, HRegion region, RegionScannersCloseCallBack closeCallBack,
2593      RpcCallContext context) throws IOException {
2594    region.prepareGet(get);
2595    boolean stale = region.getRegionInfo().getReplicaId() != 0;
2596
2597    // This method is almost the same as HRegion#get.
2598    List<Cell> results = new ArrayList<>();
2599    long before = EnvironmentEdgeManager.currentTime();
2600    // pre-get CP hook
2601    if (region.getCoprocessorHost() != null) {
2602      if (region.getCoprocessorHost().preGet(get, results)) {
2603        region.metricsUpdateForGet(results, before);
2604        return Result
2605            .create(results, get.isCheckExistenceOnly() ? !results.isEmpty() : null, stale);
2606      }
2607    }
2608    Scan scan = new Scan(get);
2609    if (scan.getLoadColumnFamiliesOnDemandValue() == null) {
2610      scan.setLoadColumnFamiliesOnDemand(region.isLoadingCfsOnDemandDefault());
2611    }
2612    RegionScannerImpl scanner = null;
2613    try {
2614      scanner = region.getScanner(scan);
2615      scanner.next(results);
2616    } finally {
2617      if (scanner != null) {
2618        if (closeCallBack == null) {
2619          // If there is a context then the scanner can be added to the current
2620          // RpcCallContext. The rpc callback will take care of closing the
2621          // scanner, for eg in case
2622          // of get()
2623          context.setCallBack(scanner);
2624        } else {
2625          // The call is from multi() where the results from the get() are
2626          // aggregated and then send out to the
2627          // rpc. The rpccall back will close all such scanners created as part
2628          // of multi().
2629          closeCallBack.addScanner(scanner);
2630        }
2631      }
2632    }
2633
2634    // post-get CP hook
2635    if (region.getCoprocessorHost() != null) {
2636      region.getCoprocessorHost().postGet(get, results);
2637    }
2638    region.metricsUpdateForGet(results, before);
2639
2640    return Result.create(results, get.isCheckExistenceOnly() ? !results.isEmpty() : null, stale);
2641  }
2642
2643  private void checkBatchSizeAndLogLargeSize(MultiRequest request) {
2644    int sum = 0;
2645    String firstRegionName = null;
2646    for (RegionAction regionAction : request.getRegionActionList()) {
2647      if (sum == 0) {
2648        firstRegionName = Bytes.toStringBinary(regionAction.getRegion().getValue().toByteArray());
2649      }
2650      sum += regionAction.getActionCount();
2651    }
2652    if (sum > rowSizeWarnThreshold) {
2653      ld.logBatchWarning(firstRegionName, sum, rowSizeWarnThreshold);
2654    }
2655  }
2656
2657  /**
2658   * Execute multiple actions on a table: get, mutate, and/or execCoprocessor
2659   *
2660   * @param rpcc the RPC controller
2661   * @param request the multi request
2662   * @throws ServiceException
2663   */
2664  @Override
2665  public MultiResponse multi(final RpcController rpcc, final MultiRequest request)
2666  throws ServiceException {
2667    try {
2668      checkOpen();
2669    } catch (IOException ie) {
2670      throw new ServiceException(ie);
2671    }
2672
2673    checkBatchSizeAndLogLargeSize(request);
2674
2675    // rpc controller is how we bring in data via the back door;  it is unprotobuf'ed data.
2676    // It is also the conduit via which we pass back data.
2677    HBaseRpcController controller = (HBaseRpcController)rpcc;
2678    CellScanner cellScanner = controller != null ? controller.cellScanner(): null;
2679    if (controller != null) {
2680      controller.setCellScanner(null);
2681    }
2682
2683    long nonceGroup = request.hasNonceGroup() ? request.getNonceGroup() : HConstants.NO_NONCE;
2684
2685    // this will contain all the cells that we need to return. It's created later, if needed.
2686    List<CellScannable> cellsToReturn = null;
2687    MultiResponse.Builder responseBuilder = MultiResponse.newBuilder();
2688    RegionActionResult.Builder regionActionResultBuilder = RegionActionResult.newBuilder();
2689    Boolean processed = null;
2690    RegionScannersCloseCallBack closeCallBack = null;
2691    RpcCallContext context = RpcServer.getCurrentCall().orElse(null);
2692    this.rpcMultiRequestCount.increment();
2693    this.requestCount.increment();
2694    Map<RegionSpecifier, ClientProtos.RegionLoadStats> regionStats = new HashMap<>(request
2695      .getRegionActionCount());
2696    ActivePolicyEnforcement spaceQuotaEnforcement = getSpaceQuotaManager().getActiveEnforcements();
2697    for (RegionAction regionAction : request.getRegionActionList()) {
2698      OperationQuota quota;
2699      HRegion region;
2700      regionActionResultBuilder.clear();
2701      RegionSpecifier regionSpecifier = regionAction.getRegion();
2702      try {
2703        region = getRegion(regionSpecifier);
2704        quota = getRpcQuotaManager().checkQuota(region, regionAction.getActionList());
2705      } catch (IOException e) {
2706        rpcServer.getMetrics().exception(e);
2707        regionActionResultBuilder.setException(ResponseConverter.buildException(e));
2708        responseBuilder.addRegionActionResult(regionActionResultBuilder.build());
2709        // All Mutations in this RegionAction not executed as we can not see the Region online here
2710        // in this RS. Will be retried from Client. Skipping all the Cells in CellScanner
2711        // corresponding to these Mutations.
2712        skipCellsForMutations(regionAction.getActionList(), cellScanner);
2713        continue;  // For this region it's a failure.
2714      }
2715
2716      if (regionAction.hasAtomic() && regionAction.getAtomic()) {
2717        // How does this call happen?  It may need some work to play well w/ the surroundings.
2718        // Need to return an item per Action along w/ Action index.  TODO.
2719        try {
2720          if (request.hasCondition()) {
2721            Condition condition = request.getCondition();
2722            byte[] row = condition.getRow().toByteArray();
2723            byte[] family = condition.getFamily().toByteArray();
2724            byte[] qualifier = condition.getQualifier().toByteArray();
2725            CompareOperator op =
2726              CompareOperator.valueOf(condition.getCompareType().name());
2727            ByteArrayComparable comparator =
2728                ProtobufUtil.toComparator(condition.getComparator());
2729            TimeRange timeRange = condition.hasTimeRange() ?
2730              ProtobufUtil.toTimeRange(condition.getTimeRange()) :
2731              TimeRange.allTime();
2732            processed =
2733              checkAndRowMutate(region, regionAction.getActionList(), cellScanner, row, family,
2734                qualifier, op, comparator, timeRange, regionActionResultBuilder,
2735                spaceQuotaEnforcement);
2736          } else {
2737            doAtomicBatchOp(regionActionResultBuilder, region, quota, regionAction.getActionList(),
2738              cellScanner, spaceQuotaEnforcement);
2739            processed = Boolean.TRUE;
2740          }
2741        } catch (IOException e) {
2742          rpcServer.getMetrics().exception(e);
2743          // As it's atomic, we may expect it's a global failure.
2744          regionActionResultBuilder.setException(ResponseConverter.buildException(e));
2745        }
2746      } else {
2747        // doNonAtomicRegionMutation manages the exception internally
2748        if (context != null && closeCallBack == null) {
2749          // An RpcCallBack that creates a list of scanners that needs to perform callBack
2750          // operation on completion of multiGets.
2751          // Set this only once
2752          closeCallBack = new RegionScannersCloseCallBack();
2753          context.setCallBack(closeCallBack);
2754        }
2755        cellsToReturn = doNonAtomicRegionMutation(region, quota, regionAction, cellScanner,
2756            regionActionResultBuilder, cellsToReturn, nonceGroup, closeCallBack, context,
2757            spaceQuotaEnforcement);
2758      }
2759      responseBuilder.addRegionActionResult(regionActionResultBuilder.build());
2760      quota.close();
2761      ClientProtos.RegionLoadStats regionLoadStats = region.getLoadStatistics();
2762      if(regionLoadStats != null) {
2763        regionStats.put(regionSpecifier, regionLoadStats);
2764      }
2765    }
2766    // Load the controller with the Cells to return.
2767    if (cellsToReturn != null && !cellsToReturn.isEmpty() && controller != null) {
2768      controller.setCellScanner(CellUtil.createCellScanner(cellsToReturn));
2769    }
2770
2771    if (processed != null) {
2772      responseBuilder.setProcessed(processed);
2773    }
2774
2775    MultiRegionLoadStats.Builder builder = MultiRegionLoadStats.newBuilder();
2776    for(Entry<RegionSpecifier, ClientProtos.RegionLoadStats> stat: regionStats.entrySet()){
2777      builder.addRegion(stat.getKey());
2778      builder.addStat(stat.getValue());
2779    }
2780    responseBuilder.setRegionStatistics(builder);
2781    return responseBuilder.build();
2782  }
2783
2784  private void skipCellsForMutations(List<Action> actions, CellScanner cellScanner) {
2785    if (cellScanner == null) {
2786      return;
2787    }
2788    for (Action action : actions) {
2789      skipCellsForMutation(action, cellScanner);
2790    }
2791  }
2792
2793  private void skipCellsForMutation(Action action, CellScanner cellScanner) {
2794    if (cellScanner == null) {
2795      return;
2796    }
2797    try {
2798      if (action.hasMutation()) {
2799        MutationProto m = action.getMutation();
2800        if (m.hasAssociatedCellCount()) {
2801          for (int i = 0; i < m.getAssociatedCellCount(); i++) {
2802            cellScanner.advance();
2803          }
2804        }
2805      }
2806    } catch (IOException e) {
2807      // No need to handle these Individual Muatation level issue. Any way this entire RegionAction
2808      // marked as failed as we could not see the Region here. At client side the top level
2809      // RegionAction exception will be considered first.
2810      LOG.error("Error while skipping Cells in CellScanner for invalid Region Mutations", e);
2811    }
2812  }
2813
2814  /**
2815   * Mutate data in a table.
2816   *
2817   * @param rpcc the RPC controller
2818   * @param request the mutate request
2819   */
2820  @Override
2821  public MutateResponse mutate(final RpcController rpcc,
2822      final MutateRequest request) throws ServiceException {
2823    // rpc controller is how we bring in data via the back door;  it is unprotobuf'ed data.
2824    // It is also the conduit via which we pass back data.
2825    HBaseRpcController controller = (HBaseRpcController)rpcc;
2826    CellScanner cellScanner = controller != null ? controller.cellScanner() : null;
2827    OperationQuota quota = null;
2828    RpcCallContext context = RpcServer.getCurrentCall().orElse(null);
2829    ActivePolicyEnforcement spaceQuotaEnforcement = null;
2830    MutationType type = null;
2831    HRegion region = null;
2832    long before = EnvironmentEdgeManager.currentTime();
2833    // Clear scanner so we are not holding on to reference across call.
2834    if (controller != null) {
2835      controller.setCellScanner(null);
2836    }
2837    try {
2838      checkOpen();
2839      requestCount.increment();
2840      rpcMutateRequestCount.increment();
2841      region = getRegion(request.getRegion());
2842      MutateResponse.Builder builder = MutateResponse.newBuilder();
2843      MutationProto mutation = request.getMutation();
2844      if (!region.getRegionInfo().isMetaRegion()) {
2845        regionServer.cacheFlusher.reclaimMemStoreMemory();
2846      }
2847      long nonceGroup = request.hasNonceGroup() ? request.getNonceGroup() : HConstants.NO_NONCE;
2848      Result r = null;
2849      Boolean processed = null;
2850      type = mutation.getMutateType();
2851
2852      quota = getRpcQuotaManager().checkQuota(region, OperationQuota.OperationType.MUTATE);
2853      spaceQuotaEnforcement = getSpaceQuotaManager().getActiveEnforcements();
2854
2855      switch (type) {
2856        case APPEND:
2857          // TODO: this doesn't actually check anything.
2858          r = append(region, quota, mutation, cellScanner, nonceGroup, spaceQuotaEnforcement);
2859          break;
2860        case INCREMENT:
2861          // TODO: this doesn't actually check anything.
2862          r = increment(region, quota, mutation, cellScanner, nonceGroup, spaceQuotaEnforcement);
2863          break;
2864        case PUT:
2865          Put put = ProtobufUtil.toPut(mutation, cellScanner);
2866          checkCellSizeLimit(region, put);
2867          // Throws an exception when violated
2868          spaceQuotaEnforcement.getPolicyEnforcement(region).check(put);
2869          quota.addMutation(put);
2870          if (request.hasCondition()) {
2871            Condition condition = request.getCondition();
2872            byte[] row = condition.getRow().toByteArray();
2873            byte[] family = condition.getFamily().toByteArray();
2874            byte[] qualifier = condition.getQualifier().toByteArray();
2875            CompareOperator compareOp =
2876              CompareOperator.valueOf(condition.getCompareType().name());
2877            ByteArrayComparable comparator = ProtobufUtil.toComparator(condition.getComparator());
2878            TimeRange timeRange = condition.hasTimeRange() ?
2879              ProtobufUtil.toTimeRange(condition.getTimeRange()) :
2880              TimeRange.allTime();
2881            if (region.getCoprocessorHost() != null) {
2882              processed = region.getCoprocessorHost().preCheckAndPut(row, family, qualifier,
2883                  compareOp, comparator, put);
2884            }
2885            if (processed == null) {
2886              boolean result = region.checkAndMutate(row, family,
2887                qualifier, compareOp, comparator, timeRange, put);
2888              if (region.getCoprocessorHost() != null) {
2889                result = region.getCoprocessorHost().postCheckAndPut(row, family,
2890                  qualifier, compareOp, comparator, put, result);
2891              }
2892              processed = result;
2893            }
2894          } else {
2895            region.put(put);
2896            processed = Boolean.TRUE;
2897          }
2898          break;
2899        case DELETE:
2900          Delete delete = ProtobufUtil.toDelete(mutation, cellScanner);
2901          checkCellSizeLimit(region, delete);
2902          spaceQuotaEnforcement.getPolicyEnforcement(region).check(delete);
2903          quota.addMutation(delete);
2904          if (request.hasCondition()) {
2905            Condition condition = request.getCondition();
2906            byte[] row = condition.getRow().toByteArray();
2907            byte[] family = condition.getFamily().toByteArray();
2908            byte[] qualifier = condition.getQualifier().toByteArray();
2909            CompareOperator op = CompareOperator.valueOf(condition.getCompareType().name());
2910            ByteArrayComparable comparator = ProtobufUtil.toComparator(condition.getComparator());
2911            TimeRange timeRange = condition.hasTimeRange() ?
2912              ProtobufUtil.toTimeRange(condition.getTimeRange()) :
2913              TimeRange.allTime();
2914            if (region.getCoprocessorHost() != null) {
2915              processed = region.getCoprocessorHost().preCheckAndDelete(row, family, qualifier, op,
2916                  comparator, delete);
2917            }
2918            if (processed == null) {
2919              boolean result = region.checkAndMutate(row, family,
2920                qualifier, op, comparator, timeRange, delete);
2921              if (region.getCoprocessorHost() != null) {
2922                result = region.getCoprocessorHost().postCheckAndDelete(row, family,
2923                  qualifier, op, comparator, delete, result);
2924              }
2925              processed = result;
2926            }
2927          } else {
2928            region.delete(delete);
2929            processed = Boolean.TRUE;
2930          }
2931          break;
2932        default:
2933          throw new DoNotRetryIOException("Unsupported mutate type: " + type.name());
2934      }
2935      if (processed != null) {
2936        builder.setProcessed(processed.booleanValue());
2937      }
2938      boolean clientCellBlockSupported = isClientCellBlockSupport(context);
2939      addResult(builder, r, controller, clientCellBlockSupported);
2940      if (clientCellBlockSupported) {
2941        addSize(context, r, null);
2942      }
2943      return builder.build();
2944    } catch (IOException ie) {
2945      regionServer.checkFileSystem();
2946      throw new ServiceException(ie);
2947    } finally {
2948      if (quota != null) {
2949        quota.close();
2950      }
2951      // Update metrics
2952      if (regionServer.metricsRegionServer != null && type != null) {
2953        long after = EnvironmentEdgeManager.currentTime();
2954        switch (type) {
2955        case DELETE:
2956          if (request.hasCondition()) {
2957            regionServer.metricsRegionServer.updateCheckAndDelete(after - before);
2958          } else {
2959            regionServer.metricsRegionServer.updateDelete(
2960                region == null ? null : region.getRegionInfo().getTable(), after - before);
2961          }
2962          break;
2963        case PUT:
2964          if (request.hasCondition()) {
2965            regionServer.metricsRegionServer.updateCheckAndPut(after - before);
2966          } else {
2967            regionServer.metricsRegionServer.updatePut(
2968                region == null ? null : region.getRegionInfo().getTable(),after - before);
2969          }
2970          break;
2971        default:
2972          break;
2973
2974        }
2975      }
2976    }
2977  }
2978
2979  // This is used to keep compatible with the old client implementation. Consider remove it if we
2980  // decide to drop the support of the client that still sends close request to a region scanner
2981  // which has already been exhausted.
2982  @Deprecated
2983  private static final IOException SCANNER_ALREADY_CLOSED = new IOException() {
2984
2985    private static final long serialVersionUID = -4305297078988180130L;
2986
2987    @Override
2988    public synchronized Throwable fillInStackTrace() {
2989      return this;
2990    }
2991  };
2992
2993  private RegionScannerHolder getRegionScanner(ScanRequest request) throws IOException {
2994    String scannerName = Long.toString(request.getScannerId());
2995    RegionScannerHolder rsh = scanners.get(scannerName);
2996    if (rsh == null) {
2997      // just ignore the next or close request if scanner does not exists.
2998      if (closedScanners.getIfPresent(scannerName) != null) {
2999        throw SCANNER_ALREADY_CLOSED;
3000      } else {
3001        LOG.warn("Client tried to access missing scanner " + scannerName);
3002        throw new UnknownScannerException(
3003            "Unknown scanner '" + scannerName + "'. This can happen due to any of the following " +
3004                "reasons: a) Scanner id given is wrong, b) Scanner lease expired because of " +
3005                "long wait between consecutive client checkins, c) Server may be closing down, " +
3006                "d) RegionServer restart during upgrade.\nIf the issue is due to reason (b), a " +
3007                "possible fix would be increasing the value of" +
3008                "'hbase.client.scanner.timeout.period' configuration.");
3009      }
3010    }
3011    RegionInfo hri = rsh.s.getRegionInfo();
3012    // Yes, should be the same instance
3013    if (regionServer.getOnlineRegion(hri.getRegionName()) != rsh.r) {
3014      String msg = "Region has changed on the scanner " + scannerName + ": regionName="
3015          + hri.getRegionNameAsString() + ", scannerRegionName=" + rsh.r;
3016      LOG.warn(msg + ", closing...");
3017      scanners.remove(scannerName);
3018      try {
3019        rsh.s.close();
3020      } catch (IOException e) {
3021        LOG.warn("Getting exception closing " + scannerName, e);
3022      } finally {
3023        try {
3024          regionServer.leases.cancelLease(scannerName);
3025        } catch (LeaseException e) {
3026          LOG.warn("Getting exception closing " + scannerName, e);
3027        }
3028      }
3029      throw new NotServingRegionException(msg);
3030    }
3031    return rsh;
3032  }
3033
3034  private RegionScannerHolder newRegionScanner(ScanRequest request, ScanResponse.Builder builder)
3035      throws IOException {
3036    HRegion region = getRegion(request.getRegion());
3037    ClientProtos.Scan protoScan = request.getScan();
3038    boolean isLoadingCfsOnDemandSet = protoScan.hasLoadColumnFamiliesOnDemand();
3039    Scan scan = ProtobufUtil.toScan(protoScan);
3040    // if the request doesn't set this, get the default region setting.
3041    if (!isLoadingCfsOnDemandSet) {
3042      scan.setLoadColumnFamiliesOnDemand(region.isLoadingCfsOnDemandDefault());
3043    }
3044
3045    if (!scan.hasFamilies()) {
3046      // Adding all families to scanner
3047      for (byte[] family : region.getTableDescriptor().getColumnFamilyNames()) {
3048        scan.addFamily(family);
3049      }
3050    }
3051    if (region.getCoprocessorHost() != null) {
3052      // preScannerOpen is not allowed to return a RegionScanner. Only post hook can create a
3053      // wrapper for the core created RegionScanner
3054      region.getCoprocessorHost().preScannerOpen(scan);
3055    }
3056    RegionScannerImpl coreScanner = region.getScanner(scan);
3057    Shipper shipper = coreScanner;
3058    RegionScanner scanner = coreScanner;
3059    if (region.getCoprocessorHost() != null) {
3060      scanner = region.getCoprocessorHost().postScannerOpen(scan, scanner);
3061    }
3062    long scannerId = scannerIdGenerator.generateNewScannerId();
3063    builder.setScannerId(scannerId);
3064    builder.setMvccReadPoint(scanner.getMvccReadPoint());
3065    builder.setTtl(scannerLeaseTimeoutPeriod);
3066    String scannerName = String.valueOf(scannerId);
3067    return addScanner(scannerName, scanner, shipper, region, scan.isNeedCursorResult());
3068  }
3069
3070  private void checkScanNextCallSeq(ScanRequest request, RegionScannerHolder rsh)
3071      throws OutOfOrderScannerNextException {
3072    // if nextCallSeq does not match throw Exception straight away. This needs to be
3073    // performed even before checking of Lease.
3074    // See HBASE-5974
3075    if (request.hasNextCallSeq()) {
3076      long callSeq = request.getNextCallSeq();
3077      if (!rsh.incNextCallSeq(callSeq)) {
3078        throw new OutOfOrderScannerNextException("Expected nextCallSeq: " + rsh.getNextCallSeq()
3079            + " But the nextCallSeq got from client: " + request.getNextCallSeq() + "; request="
3080            + TextFormat.shortDebugString(request));
3081      }
3082    }
3083  }
3084
3085  private void addScannerLeaseBack(Leases.Lease lease) {
3086    try {
3087      regionServer.leases.addLease(lease);
3088    } catch (LeaseStillHeldException e) {
3089      // should not happen as the scanner id is unique.
3090      throw new AssertionError(e);
3091    }
3092  }
3093
3094  private long getTimeLimit(HBaseRpcController controller, boolean allowHeartbeatMessages) {
3095    // Set the time limit to be half of the more restrictive timeout value (one of the
3096    // timeout values must be positive). In the event that both values are positive, the
3097    // more restrictive of the two is used to calculate the limit.
3098    if (allowHeartbeatMessages && (scannerLeaseTimeoutPeriod > 0 || rpcTimeout > 0)) {
3099      long timeLimitDelta;
3100      if (scannerLeaseTimeoutPeriod > 0 && rpcTimeout > 0) {
3101        timeLimitDelta = Math.min(scannerLeaseTimeoutPeriod, rpcTimeout);
3102      } else {
3103        timeLimitDelta = scannerLeaseTimeoutPeriod > 0 ? scannerLeaseTimeoutPeriod : rpcTimeout;
3104      }
3105      if (controller != null && controller.getCallTimeout() > 0) {
3106        timeLimitDelta = Math.min(timeLimitDelta, controller.getCallTimeout());
3107      }
3108      // Use half of whichever timeout value was more restrictive... But don't allow
3109      // the time limit to be less than the allowable minimum (could cause an
3110      // immediatate timeout before scanning any data).
3111      timeLimitDelta = Math.max(timeLimitDelta / 2, minimumScanTimeLimitDelta);
3112      // XXX: Can not use EnvironmentEdge here because TestIncrementTimeRange use a
3113      // ManualEnvironmentEdge. Consider using System.nanoTime instead.
3114      return System.currentTimeMillis() + timeLimitDelta;
3115    }
3116    // Default value of timeLimit is negative to indicate no timeLimit should be
3117    // enforced.
3118    return -1L;
3119  }
3120
3121  private void checkLimitOfRows(int numOfCompleteRows, int limitOfRows, boolean moreRows,
3122      ScannerContext scannerContext, ScanResponse.Builder builder) {
3123    if (numOfCompleteRows >= limitOfRows) {
3124      if (LOG.isTraceEnabled()) {
3125        LOG.trace("Done scanning, limit of rows reached, moreRows: " + moreRows +
3126            " scannerContext: " + scannerContext);
3127      }
3128      builder.setMoreResults(false);
3129    }
3130  }
3131
3132  // return whether we have more results in region.
3133  private void scan(HBaseRpcController controller, ScanRequest request, RegionScannerHolder rsh,
3134      long maxQuotaResultSize, int maxResults, int limitOfRows, List<Result> results,
3135      ScanResponse.Builder builder, MutableObject<Object> lastBlock, RpcCallContext context)
3136      throws IOException {
3137    HRegion region = rsh.r;
3138    RegionScanner scanner = rsh.s;
3139    long maxResultSize;
3140    if (scanner.getMaxResultSize() > 0) {
3141      maxResultSize = Math.min(scanner.getMaxResultSize(), maxQuotaResultSize);
3142    } else {
3143      maxResultSize = maxQuotaResultSize;
3144    }
3145    // This is cells inside a row. Default size is 10 so if many versions or many cfs,
3146    // then we'll resize. Resizings show in profiler. Set it higher than 10. For now
3147    // arbitrary 32. TODO: keep record of general size of results being returned.
3148    List<Cell> values = new ArrayList<>(32);
3149    region.startRegionOperation(Operation.SCAN);
3150    try {
3151      int numOfResults = 0;
3152      int numOfCompleteRows = 0;
3153      long before = EnvironmentEdgeManager.currentTime();
3154      synchronized (scanner) {
3155        boolean stale = (region.getRegionInfo().getReplicaId() != 0);
3156        boolean clientHandlesPartials =
3157            request.hasClientHandlesPartials() && request.getClientHandlesPartials();
3158        boolean clientHandlesHeartbeats =
3159            request.hasClientHandlesHeartbeats() && request.getClientHandlesHeartbeats();
3160
3161        // On the server side we must ensure that the correct ordering of partial results is
3162        // returned to the client to allow them to properly reconstruct the partial results.
3163        // If the coprocessor host is adding to the result list, we cannot guarantee the
3164        // correct ordering of partial results and so we prevent partial results from being
3165        // formed.
3166        boolean serverGuaranteesOrderOfPartials = results.isEmpty();
3167        boolean allowPartialResults = clientHandlesPartials && serverGuaranteesOrderOfPartials;
3168        boolean moreRows = false;
3169
3170        // Heartbeat messages occur when the processing of the ScanRequest is exceeds a
3171        // certain time threshold on the server. When the time threshold is exceeded, the
3172        // server stops the scan and sends back whatever Results it has accumulated within
3173        // that time period (may be empty). Since heartbeat messages have the potential to
3174        // create partial Results (in the event that the timeout occurs in the middle of a
3175        // row), we must only generate heartbeat messages when the client can handle both
3176        // heartbeats AND partials
3177        boolean allowHeartbeatMessages = clientHandlesHeartbeats && allowPartialResults;
3178
3179        long timeLimit = getTimeLimit(controller, allowHeartbeatMessages);
3180
3181        final LimitScope sizeScope =
3182            allowPartialResults ? LimitScope.BETWEEN_CELLS : LimitScope.BETWEEN_ROWS;
3183        final LimitScope timeScope =
3184            allowHeartbeatMessages ? LimitScope.BETWEEN_CELLS : LimitScope.BETWEEN_ROWS;
3185
3186        boolean trackMetrics = request.hasTrackScanMetrics() && request.getTrackScanMetrics();
3187
3188        // Configure with limits for this RPC. Set keep progress true since size progress
3189        // towards size limit should be kept between calls to nextRaw
3190        ScannerContext.Builder contextBuilder = ScannerContext.newBuilder(true);
3191        // maxResultSize - either we can reach this much size for all cells(being read) data or sum
3192        // of heap size occupied by cells(being read). Cell data means its key and value parts.
3193        contextBuilder.setSizeLimit(sizeScope, maxResultSize, maxResultSize);
3194        contextBuilder.setBatchLimit(scanner.getBatch());
3195        contextBuilder.setTimeLimit(timeScope, timeLimit);
3196        contextBuilder.setTrackMetrics(trackMetrics);
3197        ScannerContext scannerContext = contextBuilder.build();
3198        boolean limitReached = false;
3199        while (numOfResults < maxResults) {
3200          // Reset the batch progress to 0 before every call to RegionScanner#nextRaw. The
3201          // batch limit is a limit on the number of cells per Result. Thus, if progress is
3202          // being tracked (i.e. scannerContext.keepProgress() is true) then we need to
3203          // reset the batch progress between nextRaw invocations since we don't want the
3204          // batch progress from previous calls to affect future calls
3205          scannerContext.setBatchProgress(0);
3206
3207          // Collect values to be returned here
3208          moreRows = scanner.nextRaw(values, scannerContext);
3209
3210          if (!values.isEmpty()) {
3211            if (limitOfRows > 0) {
3212              // First we need to check if the last result is partial and we have a row change. If
3213              // so then we need to increase the numOfCompleteRows.
3214              if (results.isEmpty()) {
3215                if (rsh.rowOfLastPartialResult != null &&
3216                    !CellUtil.matchingRows(values.get(0), rsh.rowOfLastPartialResult)) {
3217                  numOfCompleteRows++;
3218                  checkLimitOfRows(numOfCompleteRows, limitOfRows, moreRows, scannerContext,
3219                    builder);
3220                }
3221              } else {
3222                Result lastResult = results.get(results.size() - 1);
3223                if (lastResult.mayHaveMoreCellsInRow() &&
3224                    !CellUtil.matchingRows(values.get(0), lastResult.getRow())) {
3225                  numOfCompleteRows++;
3226                  checkLimitOfRows(numOfCompleteRows, limitOfRows, moreRows, scannerContext,
3227                    builder);
3228                }
3229              }
3230              if (builder.hasMoreResults() && !builder.getMoreResults()) {
3231                break;
3232              }
3233            }
3234            boolean mayHaveMoreCellsInRow = scannerContext.mayHaveMoreCellsInRow();
3235            Result r = Result.create(values, null, stale, mayHaveMoreCellsInRow);
3236            lastBlock.setValue(addSize(context, r, lastBlock.getValue()));
3237            results.add(r);
3238            numOfResults++;
3239            if (!mayHaveMoreCellsInRow && limitOfRows > 0) {
3240              numOfCompleteRows++;
3241              checkLimitOfRows(numOfCompleteRows, limitOfRows, moreRows, scannerContext, builder);
3242              if (builder.hasMoreResults() && !builder.getMoreResults()) {
3243                break;
3244              }
3245            }
3246          } else if (!moreRows && !results.isEmpty()) {
3247            // No more cells for the scan here, we need to ensure that the mayHaveMoreCellsInRow of
3248            // last result is false. Otherwise it's possible that: the first nextRaw returned
3249            // because BATCH_LIMIT_REACHED (BTW it happen to exhaust all cells of the scan),so the
3250            // last result's mayHaveMoreCellsInRow will be true. while the following nextRaw will
3251            // return with moreRows=false, which means moreResultsInRegion would be false, it will
3252            // be a contradictory state (HBASE-21206).
3253            int lastIdx = results.size() - 1;
3254            Result r = results.get(lastIdx);
3255            if (r.mayHaveMoreCellsInRow()) {
3256              results.set(lastIdx, Result.create(r.rawCells(), r.getExists(), r.isStale(), false));
3257            }
3258          }
3259          boolean sizeLimitReached = scannerContext.checkSizeLimit(LimitScope.BETWEEN_ROWS);
3260          boolean timeLimitReached = scannerContext.checkTimeLimit(LimitScope.BETWEEN_ROWS);
3261          boolean resultsLimitReached = numOfResults >= maxResults;
3262          limitReached = sizeLimitReached || timeLimitReached || resultsLimitReached;
3263
3264          if (limitReached || !moreRows) {
3265            // We only want to mark a ScanResponse as a heartbeat message in the event that
3266            // there are more values to be read server side. If there aren't more values,
3267            // marking it as a heartbeat is wasteful because the client will need to issue
3268            // another ScanRequest only to realize that they already have all the values
3269            if (moreRows && timeLimitReached) {
3270              // Heartbeat messages occur when the time limit has been reached.
3271              builder.setHeartbeatMessage(true);
3272              if (rsh.needCursor) {
3273                Cell cursorCell = scannerContext.getLastPeekedCell();
3274                if (cursorCell != null) {
3275                  builder.setCursor(ProtobufUtil.toCursor(cursorCell));
3276                }
3277              }
3278            }
3279            break;
3280          }
3281          values.clear();
3282        }
3283        builder.setMoreResultsInRegion(moreRows);
3284        // Check to see if the client requested that we track metrics server side. If the
3285        // client requested metrics, retrieve the metrics from the scanner context.
3286        if (trackMetrics) {
3287          Map<String, Long> metrics = scannerContext.getMetrics().getMetricsMap();
3288          ScanMetrics.Builder metricBuilder = ScanMetrics.newBuilder();
3289          NameInt64Pair.Builder pairBuilder = NameInt64Pair.newBuilder();
3290
3291          for (Entry<String, Long> entry : metrics.entrySet()) {
3292            pairBuilder.setName(entry.getKey());
3293            pairBuilder.setValue(entry.getValue());
3294            metricBuilder.addMetrics(pairBuilder.build());
3295          }
3296
3297          builder.setScanMetrics(metricBuilder.build());
3298        }
3299      }
3300      long end = EnvironmentEdgeManager.currentTime();
3301      long responseCellSize = context != null ? context.getResponseCellSize() : 0;
3302      region.getMetrics().updateScanTime(end - before);
3303      if (regionServer.metricsRegionServer != null) {
3304        regionServer.metricsRegionServer.updateScanSize(
3305            region.getTableDescriptor().getTableName(), responseCellSize);
3306        regionServer.metricsRegionServer.updateScanTime(
3307            region.getTableDescriptor().getTableName(), end - before);
3308      }
3309    } finally {
3310      region.closeRegionOperation();
3311    }
3312    // coprocessor postNext hook
3313    if (region.getCoprocessorHost() != null) {
3314      region.getCoprocessorHost().postScannerNext(scanner, results, maxResults, true);
3315    }
3316  }
3317
3318  /**
3319   * Scan data in a table.
3320   *
3321   * @param controller the RPC controller
3322   * @param request the scan request
3323   * @throws ServiceException
3324   */
3325  @Override
3326  public ScanResponse scan(final RpcController controller, final ScanRequest request)
3327      throws ServiceException {
3328    if (controller != null && !(controller instanceof HBaseRpcController)) {
3329      throw new UnsupportedOperationException(
3330          "We only do " + "HBaseRpcControllers! FIX IF A PROBLEM: " + controller);
3331    }
3332    if (!request.hasScannerId() && !request.hasScan()) {
3333      throw new ServiceException(
3334          new DoNotRetryIOException("Missing required input: scannerId or scan"));
3335    }
3336    try {
3337      checkOpen();
3338    } catch (IOException e) {
3339      if (request.hasScannerId()) {
3340        String scannerName = Long.toString(request.getScannerId());
3341        if (LOG.isDebugEnabled()) {
3342          LOG.debug(
3343            "Server shutting down and client tried to access missing scanner " + scannerName);
3344        }
3345        if (regionServer.leases != null) {
3346          try {
3347            regionServer.leases.cancelLease(scannerName);
3348          } catch (LeaseException le) {
3349            // No problem, ignore
3350            if (LOG.isTraceEnabled()) {
3351              LOG.trace("Un-able to cancel lease of scanner. It could already be closed.");
3352            }
3353          }
3354        }
3355      }
3356      throw new ServiceException(e);
3357    }
3358    requestCount.increment();
3359    rpcScanRequestCount.increment();
3360    RegionScannerHolder rsh;
3361    ScanResponse.Builder builder = ScanResponse.newBuilder();
3362    try {
3363      if (request.hasScannerId()) {
3364        // The downstream projects such as AsyncHBase in OpenTSDB need this value. See HBASE-18000
3365        // for more details.
3366        builder.setScannerId(request.getScannerId());
3367        rsh = getRegionScanner(request);
3368      } else {
3369        rsh = newRegionScanner(request, builder);
3370      }
3371    } catch (IOException e) {
3372      if (e == SCANNER_ALREADY_CLOSED) {
3373        // Now we will close scanner automatically if there are no more results for this region but
3374        // the old client will still send a close request to us. Just ignore it and return.
3375        return builder.build();
3376      }
3377      throw new ServiceException(e);
3378    }
3379    HRegion region = rsh.r;
3380    String scannerName = rsh.scannerName;
3381    Leases.Lease lease;
3382    try {
3383      // Remove lease while its being processed in server; protects against case
3384      // where processing of request takes > lease expiration time.
3385      lease = regionServer.leases.removeLease(scannerName);
3386    } catch (LeaseException e) {
3387      throw new ServiceException(e);
3388    }
3389    if (request.hasRenew() && request.getRenew()) {
3390      // add back and return
3391      addScannerLeaseBack(lease);
3392      try {
3393        checkScanNextCallSeq(request, rsh);
3394      } catch (OutOfOrderScannerNextException e) {
3395        throw new ServiceException(e);
3396      }
3397      return builder.build();
3398    }
3399    OperationQuota quota;
3400    try {
3401      quota = getRpcQuotaManager().checkQuota(region, OperationQuota.OperationType.SCAN);
3402    } catch (IOException e) {
3403      addScannerLeaseBack(lease);
3404      throw new ServiceException(e);
3405    }
3406    try {
3407      checkScanNextCallSeq(request, rsh);
3408    } catch (OutOfOrderScannerNextException e) {
3409      addScannerLeaseBack(lease);
3410      throw new ServiceException(e);
3411    }
3412    // Now we have increased the next call sequence. If we give client an error, the retry will
3413    // never success. So we'd better close the scanner and return a DoNotRetryIOException to client
3414    // and then client will try to open a new scanner.
3415    boolean closeScanner = request.hasCloseScanner() ? request.getCloseScanner() : false;
3416    int rows; // this is scan.getCaching
3417    if (request.hasNumberOfRows()) {
3418      rows = request.getNumberOfRows();
3419    } else {
3420      rows = closeScanner ? 0 : 1;
3421    }
3422    RpcCallContext context = RpcServer.getCurrentCall().orElse(null);
3423    // now let's do the real scan.
3424    long maxQuotaResultSize = Math.min(maxScannerResultSize, quota.getReadAvailable());
3425    RegionScanner scanner = rsh.s;
3426    // this is the limit of rows for this scan, if we the number of rows reach this value, we will
3427    // close the scanner.
3428    int limitOfRows;
3429    if (request.hasLimitOfRows()) {
3430      limitOfRows = request.getLimitOfRows();
3431    } else {
3432      limitOfRows = -1;
3433    }
3434    MutableObject<Object> lastBlock = new MutableObject<>();
3435    boolean scannerClosed = false;
3436    try {
3437      List<Result> results = new ArrayList<>(Math.min(rows, 512));
3438      if (rows > 0) {
3439        boolean done = false;
3440        // Call coprocessor. Get region info from scanner.
3441        if (region.getCoprocessorHost() != null) {
3442          Boolean bypass = region.getCoprocessorHost().preScannerNext(scanner, results, rows);
3443          if (!results.isEmpty()) {
3444            for (Result r : results) {
3445              lastBlock.setValue(addSize(context, r, lastBlock.getValue()));
3446            }
3447          }
3448          if (bypass != null && bypass.booleanValue()) {
3449            done = true;
3450          }
3451        }
3452        if (!done) {
3453          scan((HBaseRpcController) controller, request, rsh, maxQuotaResultSize, rows, limitOfRows,
3454            results, builder, lastBlock, context);
3455        } else {
3456          builder.setMoreResultsInRegion(!results.isEmpty());
3457        }
3458      } else {
3459        // This is a open scanner call with numberOfRow = 0, so set more results in region to true.
3460        builder.setMoreResultsInRegion(true);
3461      }
3462
3463      quota.addScanResult(results);
3464      addResults(builder, results, (HBaseRpcController) controller,
3465        RegionReplicaUtil.isDefaultReplica(region.getRegionInfo()),
3466        isClientCellBlockSupport(context));
3467      if (scanner.isFilterDone() && results.isEmpty()) {
3468        // If the scanner's filter - if any - is done with the scan
3469        // only set moreResults to false if the results is empty. This is used to keep compatible
3470        // with the old scan implementation where we just ignore the returned results if moreResults
3471        // is false. Can remove the isEmpty check after we get rid of the old implementation.
3472        builder.setMoreResults(false);
3473      }
3474      // Later we may close the scanner depending on this flag so here we need to make sure that we
3475      // have already set this flag.
3476      assert builder.hasMoreResultsInRegion();
3477      // we only set moreResults to false in the above code, so set it to true if we haven't set it
3478      // yet.
3479      if (!builder.hasMoreResults()) {
3480        builder.setMoreResults(true);
3481      }
3482      if (builder.getMoreResults() && builder.getMoreResultsInRegion() && !results.isEmpty()) {
3483        // Record the last cell of the last result if it is a partial result
3484        // We need this to calculate the complete rows we have returned to client as the
3485        // mayHaveMoreCellsInRow is true does not mean that there will be extra cells for the
3486        // current row. We may filter out all the remaining cells for the current row and just
3487        // return the cells of the nextRow when calling RegionScanner.nextRaw. So here we need to
3488        // check for row change.
3489        Result lastResult = results.get(results.size() - 1);
3490        if (lastResult.mayHaveMoreCellsInRow()) {
3491          rsh.rowOfLastPartialResult = lastResult.getRow();
3492        } else {
3493          rsh.rowOfLastPartialResult = null;
3494        }
3495      }
3496      if (!builder.getMoreResults() || !builder.getMoreResultsInRegion() || closeScanner) {
3497        scannerClosed = true;
3498        closeScanner(region, scanner, scannerName, context);
3499      }
3500      return builder.build();
3501    } catch (IOException e) {
3502      try {
3503        // scanner is closed here
3504        scannerClosed = true;
3505        // The scanner state might be left in a dirty state, so we will tell the Client to
3506        // fail this RPC and close the scanner while opening up another one from the start of
3507        // row that the client has last seen.
3508        closeScanner(region, scanner, scannerName, context);
3509
3510        // If it is a DoNotRetryIOException already, throw as it is. Unfortunately, DNRIOE is
3511        // used in two different semantics.
3512        // (1) The first is to close the client scanner and bubble up the exception all the way
3513        // to the application. This is preferred when the exception is really un-recoverable
3514        // (like CorruptHFileException, etc). Plain DoNotRetryIOException also falls into this
3515        // bucket usually.
3516        // (2) Second semantics is to close the current region scanner only, but continue the
3517        // client scanner by overriding the exception. This is usually UnknownScannerException,
3518        // OutOfOrderScannerNextException, etc where the region scanner has to be closed, but the
3519        // application-level ClientScanner has to continue without bubbling up the exception to
3520        // the client. See ClientScanner code to see how it deals with these special exceptions.
3521        if (e instanceof DoNotRetryIOException) {
3522          throw e;
3523        }
3524
3525        // If it is a FileNotFoundException, wrap as a
3526        // DoNotRetryIOException. This can avoid the retry in ClientScanner.
3527        if (e instanceof FileNotFoundException) {
3528          throw new DoNotRetryIOException(e);
3529        }
3530
3531        // We closed the scanner already. Instead of throwing the IOException, and client
3532        // retrying with the same scannerId only to get USE on the next RPC, we directly throw
3533        // a special exception to save an RPC.
3534        if (VersionInfoUtil.hasMinimumVersion(context.getClientVersionInfo(), 1, 4)) {
3535          // 1.4.0+ clients know how to handle
3536          throw new ScannerResetException("Scanner is closed on the server-side", e);
3537        } else {
3538          // older clients do not know about SRE. Just throw USE, which they will handle
3539          throw new UnknownScannerException("Throwing UnknownScannerException to reset the client"
3540              + " scanner state for clients older than 1.3.", e);
3541        }
3542      } catch (IOException ioe) {
3543        throw new ServiceException(ioe);
3544      }
3545    } finally {
3546      if (!scannerClosed) {
3547        // Adding resets expiration time on lease.
3548        // the closeCallBack will be set in closeScanner so here we only care about shippedCallback
3549        if (context != null) {
3550          context.setCallBack(rsh.shippedCallback);
3551        } else {
3552          // When context != null, adding back the lease will be done in callback set above.
3553          addScannerLeaseBack(lease);
3554        }
3555      }
3556      quota.close();
3557    }
3558  }
3559
3560  private void closeScanner(HRegion region, RegionScanner scanner, String scannerName,
3561      RpcCallContext context) throws IOException {
3562    if (region.getCoprocessorHost() != null) {
3563      if (region.getCoprocessorHost().preScannerClose(scanner)) {
3564        // bypass the actual close.
3565        return;
3566      }
3567    }
3568    RegionScannerHolder rsh = scanners.remove(scannerName);
3569    if (rsh != null) {
3570      if (context != null) {
3571        context.setCallBack(rsh.closeCallBack);
3572      } else {
3573        rsh.s.close();
3574      }
3575      if (region.getCoprocessorHost() != null) {
3576        region.getCoprocessorHost().postScannerClose(scanner);
3577      }
3578      closedScanners.put(scannerName, scannerName);
3579    }
3580  }
3581
3582  @Override
3583  public CoprocessorServiceResponse execRegionServerService(RpcController controller,
3584      CoprocessorServiceRequest request) throws ServiceException {
3585    rpcPreCheck("execRegionServerService");
3586    return regionServer.execRegionServerService(controller, request);
3587  }
3588
3589  @Override
3590  public UpdateConfigurationResponse updateConfiguration(
3591      RpcController controller, UpdateConfigurationRequest request)
3592      throws ServiceException {
3593    try {
3594      this.regionServer.updateConfiguration();
3595    } catch (Exception e) {
3596      throw new ServiceException(e);
3597    }
3598    return UpdateConfigurationResponse.getDefaultInstance();
3599  }
3600
3601  @Override
3602  public GetSpaceQuotaSnapshotsResponse getSpaceQuotaSnapshots(
3603      RpcController controller, GetSpaceQuotaSnapshotsRequest request) throws ServiceException {
3604    try {
3605      final RegionServerSpaceQuotaManager manager =
3606          regionServer.getRegionServerSpaceQuotaManager();
3607      final GetSpaceQuotaSnapshotsResponse.Builder builder =
3608          GetSpaceQuotaSnapshotsResponse.newBuilder();
3609      if (manager != null) {
3610        final Map<TableName,SpaceQuotaSnapshot> snapshots = manager.copyQuotaSnapshots();
3611        for (Entry<TableName,SpaceQuotaSnapshot> snapshot : snapshots.entrySet()) {
3612          builder.addSnapshots(TableQuotaSnapshot.newBuilder()
3613              .setTableName(ProtobufUtil.toProtoTableName(snapshot.getKey()))
3614              .setSnapshot(SpaceQuotaSnapshot.toProtoSnapshot(snapshot.getValue()))
3615              .build());
3616        }
3617      }
3618      return builder.build();
3619    } catch (Exception e) {
3620      throw new ServiceException(e);
3621    }
3622  }
3623
3624  @Override
3625  public ClearRegionBlockCacheResponse clearRegionBlockCache(RpcController controller,
3626      ClearRegionBlockCacheRequest request) {
3627    ClearRegionBlockCacheResponse.Builder builder =
3628        ClearRegionBlockCacheResponse.newBuilder();
3629    CacheEvictionStatsBuilder stats = CacheEvictionStats.builder();
3630    List<HRegion> regions = getRegions(request.getRegionList(), stats);
3631    for (HRegion region : regions) {
3632      try {
3633        stats = stats.append(this.regionServer.clearRegionBlockCache(region));
3634      } catch (Exception e) {
3635        stats.addException(region.getRegionInfo().getRegionName(), e);
3636      }
3637    }
3638    stats.withMaxCacheSize(regionServer.getBlockCache().map(BlockCache::getMaxSize).orElse(0L));
3639    return builder.setStats(ProtobufUtil.toCacheEvictionStats(stats.build())).build();
3640  }
3641
3642  private void executeOpenRegionProcedures(OpenRegionRequest request,
3643      Map<TableName, TableDescriptor> tdCache) {
3644    long masterSystemTime = request.hasMasterSystemTime() ? request.getMasterSystemTime() : -1;
3645    for (RegionOpenInfo regionOpenInfo : request.getOpenInfoList()) {
3646      RegionInfo regionInfo = ProtobufUtil.toRegionInfo(regionOpenInfo.getRegion());
3647      TableDescriptor tableDesc = tdCache.get(regionInfo.getTable());
3648      if (tableDesc == null) {
3649        try {
3650          tableDesc = regionServer.getTableDescriptors().get(regionInfo.getTable());
3651        } catch (IOException e) {
3652          // Here we do not fail the whole method since we also need deal with other
3653          // procedures, and we can not ignore this one, so we still schedule a
3654          // AssignRegionHandler and it will report back to master if we still can not get the
3655          // TableDescriptor.
3656          LOG.warn("Failed to get TableDescriptor of {}, will try again in the handler",
3657            regionInfo.getTable(), e);
3658        }
3659      }
3660      if (regionOpenInfo.getFavoredNodesCount() > 0) {
3661        regionServer.updateRegionFavoredNodesMapping(regionInfo.getEncodedName(),
3662          regionOpenInfo.getFavoredNodesList());
3663      }
3664      long procId = regionOpenInfo.getOpenProcId();
3665      if (regionServer.submitRegionProcedure(procId)) {
3666        regionServer.executorService.submit(AssignRegionHandler
3667            .create(regionServer, regionInfo, procId, tableDesc,
3668                masterSystemTime));
3669      }
3670    }
3671  }
3672
3673  private void executeCloseRegionProcedures(CloseRegionRequest request) {
3674    String encodedName;
3675    try {
3676      encodedName = ProtobufUtil.getRegionEncodedName(request.getRegion());
3677    } catch (DoNotRetryIOException e) {
3678      throw new UncheckedIOException("Should not happen", e);
3679    }
3680    ServerName destination = request.hasDestinationServer() ?
3681        ProtobufUtil.toServerName(request.getDestinationServer()) :
3682        null;
3683    long procId = request.getCloseProcId();
3684    if (regionServer.submitRegionProcedure(procId)) {
3685      regionServer.executorService.submit(UnassignRegionHandler
3686          .create(regionServer, encodedName, procId, false, destination));
3687    }
3688  }
3689
3690  private void executeProcedures(RemoteProcedureRequest request) {
3691    RSProcedureCallable callable;
3692    try {
3693      callable = Class.forName(request.getProcClass()).asSubclass(RSProcedureCallable.class)
3694        .getDeclaredConstructor().newInstance();
3695    } catch (Exception e) {
3696      LOG.warn("Failed to instantiating remote procedure {}, pid={}", request.getProcClass(),
3697        request.getProcId(), e);
3698      regionServer.remoteProcedureComplete(request.getProcId(), e);
3699      return;
3700    }
3701    callable.init(request.getProcData().toByteArray(), regionServer);
3702    LOG.debug("Executing remote procedure {}, pid={}", callable.getClass(), request.getProcId());
3703    regionServer.executeProcedure(request.getProcId(), callable);
3704  }
3705
3706  @Override
3707  @QosPriority(priority = HConstants.ADMIN_QOS)
3708  public ExecuteProceduresResponse executeProcedures(RpcController controller,
3709      ExecuteProceduresRequest request) throws ServiceException {
3710    try {
3711      checkOpen();
3712      throwOnWrongStartCode(request);
3713      regionServer.getRegionServerCoprocessorHost().preExecuteProcedures();
3714      if (request.getOpenRegionCount() > 0) {
3715        // Avoid reading from the TableDescritor every time(usually it will read from the file
3716        // system)
3717        Map<TableName, TableDescriptor> tdCache = new HashMap<>();
3718        request.getOpenRegionList().forEach(req -> executeOpenRegionProcedures(req, tdCache));
3719      }
3720      if (request.getCloseRegionCount() > 0) {
3721        request.getCloseRegionList().forEach(this::executeCloseRegionProcedures);
3722      }
3723      if (request.getProcCount() > 0) {
3724        request.getProcList().forEach(this::executeProcedures);
3725      }
3726      regionServer.getRegionServerCoprocessorHost().postExecuteProcedures();
3727      return ExecuteProceduresResponse.getDefaultInstance();
3728    } catch (IOException e) {
3729      throw new ServiceException(e);
3730    }
3731  }
3732
3733  @VisibleForTesting
3734  public RpcScheduler getRpcScheduler() {
3735    return rpcServer.getScheduler();
3736  }
3737}