001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import java.io.FileNotFoundException;
021import java.io.IOException;
022import java.io.InterruptedIOException;
023import java.io.UncheckedIOException;
024import java.lang.reflect.InvocationTargetException;
025import java.net.BindException;
026import java.net.InetSocketAddress;
027import java.nio.ByteBuffer;
028import java.util.ArrayList;
029import java.util.Arrays;
030import java.util.Collections;
031import java.util.HashMap;
032import java.util.Iterator;
033import java.util.List;
034import java.util.Map;
035import java.util.Map.Entry;
036import java.util.NavigableMap;
037import java.util.Optional;
038import java.util.Set;
039import java.util.TreeSet;
040import java.util.concurrent.ConcurrentHashMap;
041import java.util.concurrent.ConcurrentMap;
042import java.util.concurrent.TimeUnit;
043import java.util.concurrent.atomic.AtomicBoolean;
044import java.util.concurrent.atomic.AtomicLong;
045import java.util.concurrent.atomic.LongAdder;
046import org.apache.commons.lang3.mutable.MutableObject;
047import org.apache.hadoop.conf.Configuration;
048import org.apache.hadoop.fs.Path;
049import org.apache.hadoop.hbase.ByteBufferExtendedCell;
050import org.apache.hadoop.hbase.CacheEvictionStats;
051import org.apache.hadoop.hbase.CacheEvictionStatsBuilder;
052import org.apache.hadoop.hbase.Cell;
053import org.apache.hadoop.hbase.CellScannable;
054import org.apache.hadoop.hbase.CellScanner;
055import org.apache.hadoop.hbase.CellUtil;
056import org.apache.hadoop.hbase.CompareOperator;
057import org.apache.hadoop.hbase.DoNotRetryIOException;
058import org.apache.hadoop.hbase.DroppedSnapshotException;
059import org.apache.hadoop.hbase.HBaseIOException;
060import org.apache.hadoop.hbase.HConstants;
061import org.apache.hadoop.hbase.MultiActionResultTooLarge;
062import org.apache.hadoop.hbase.NotServingRegionException;
063import org.apache.hadoop.hbase.PrivateCellUtil;
064import org.apache.hadoop.hbase.RegionTooBusyException;
065import org.apache.hadoop.hbase.Server;
066import org.apache.hadoop.hbase.ServerName;
067import org.apache.hadoop.hbase.TableName;
068import org.apache.hadoop.hbase.UnknownScannerException;
069import org.apache.hadoop.hbase.client.Append;
070import org.apache.hadoop.hbase.client.ConnectionUtils;
071import org.apache.hadoop.hbase.client.Delete;
072import org.apache.hadoop.hbase.client.Durability;
073import org.apache.hadoop.hbase.client.Get;
074import org.apache.hadoop.hbase.client.Increment;
075import org.apache.hadoop.hbase.client.Mutation;
076import org.apache.hadoop.hbase.client.Put;
077import org.apache.hadoop.hbase.client.RegionInfo;
078import org.apache.hadoop.hbase.client.RegionReplicaUtil;
079import org.apache.hadoop.hbase.client.Result;
080import org.apache.hadoop.hbase.client.Row;
081import org.apache.hadoop.hbase.client.RowMutations;
082import org.apache.hadoop.hbase.client.Scan;
083import org.apache.hadoop.hbase.client.TableDescriptor;
084import org.apache.hadoop.hbase.client.VersionInfoUtil;
085import org.apache.hadoop.hbase.conf.ConfigurationObserver;
086import org.apache.hadoop.hbase.exceptions.FailedSanityCheckException;
087import org.apache.hadoop.hbase.exceptions.OutOfOrderScannerNextException;
088import org.apache.hadoop.hbase.exceptions.ScannerResetException;
089import org.apache.hadoop.hbase.exceptions.UnknownProtocolException;
090import org.apache.hadoop.hbase.filter.ByteArrayComparable;
091import org.apache.hadoop.hbase.filter.Filter;
092import org.apache.hadoop.hbase.io.ByteBuffAllocator;
093import org.apache.hadoop.hbase.io.TimeRange;
094import org.apache.hadoop.hbase.io.hfile.BlockCache;
095import org.apache.hadoop.hbase.ipc.HBaseRPCErrorHandler;
096import org.apache.hadoop.hbase.ipc.HBaseRpcController;
097import org.apache.hadoop.hbase.ipc.PriorityFunction;
098import org.apache.hadoop.hbase.ipc.QosPriority;
099import org.apache.hadoop.hbase.ipc.RpcCallContext;
100import org.apache.hadoop.hbase.ipc.RpcCallback;
101import org.apache.hadoop.hbase.ipc.RpcScheduler;
102import org.apache.hadoop.hbase.ipc.RpcServer;
103import org.apache.hadoop.hbase.ipc.RpcServer.BlockingServiceAndInterface;
104import org.apache.hadoop.hbase.ipc.RpcServerFactory;
105import org.apache.hadoop.hbase.ipc.RpcServerInterface;
106import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
107import org.apache.hadoop.hbase.ipc.ServerRpcController;
108import org.apache.hadoop.hbase.log.HBaseMarkers;
109import org.apache.hadoop.hbase.master.HMaster;
110import org.apache.hadoop.hbase.master.MasterRpcServices;
111import org.apache.hadoop.hbase.net.Address;
112import org.apache.hadoop.hbase.procedure2.RSProcedureCallable;
113import org.apache.hadoop.hbase.quotas.ActivePolicyEnforcement;
114import org.apache.hadoop.hbase.quotas.OperationQuota;
115import org.apache.hadoop.hbase.quotas.QuotaUtil;
116import org.apache.hadoop.hbase.quotas.RegionServerRpcQuotaManager;
117import org.apache.hadoop.hbase.quotas.RegionServerSpaceQuotaManager;
118import org.apache.hadoop.hbase.quotas.SpaceQuotaSnapshot;
119import org.apache.hadoop.hbase.quotas.SpaceViolationPolicyEnforcement;
120import org.apache.hadoop.hbase.regionserver.HRegion.RegionScannerImpl;
121import org.apache.hadoop.hbase.regionserver.LeaseManager.Lease;
122import org.apache.hadoop.hbase.regionserver.LeaseManager.LeaseStillHeldException;
123import org.apache.hadoop.hbase.regionserver.Region.Operation;
124import org.apache.hadoop.hbase.regionserver.ScannerContext.LimitScope;
125import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
126import org.apache.hadoop.hbase.regionserver.handler.AssignRegionHandler;
127import org.apache.hadoop.hbase.regionserver.handler.OpenMetaHandler;
128import org.apache.hadoop.hbase.regionserver.handler.OpenPriorityRegionHandler;
129import org.apache.hadoop.hbase.regionserver.handler.OpenRegionHandler;
130import org.apache.hadoop.hbase.regionserver.handler.UnassignRegionHandler;
131import org.apache.hadoop.hbase.regionserver.slowlog.SlowLogRecorder;
132import org.apache.hadoop.hbase.security.Superusers;
133import org.apache.hadoop.hbase.security.User;
134import org.apache.hadoop.hbase.security.access.AccessChecker;
135import org.apache.hadoop.hbase.security.access.NoopAccessChecker;
136import org.apache.hadoop.hbase.security.access.Permission;
137import org.apache.hadoop.hbase.security.access.ZKPermissionWatcher;
138import org.apache.hadoop.hbase.util.Bytes;
139import org.apache.hadoop.hbase.util.DNS;
140import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
141import org.apache.hadoop.hbase.util.Pair;
142import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil;
143import org.apache.hadoop.hbase.wal.WAL;
144import org.apache.hadoop.hbase.wal.WALEdit;
145import org.apache.hadoop.hbase.wal.WALKey;
146import org.apache.hadoop.hbase.wal.WALSplitUtil;
147import org.apache.hadoop.hbase.wal.WALSplitUtil.MutationReplay;
148import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
149import org.apache.yetus.audience.InterfaceAudience;
150import org.apache.zookeeper.KeeperException;
151import org.slf4j.Logger;
152import org.slf4j.LoggerFactory;
153
154import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
155import org.apache.hbase.thirdparty.com.google.common.cache.Cache;
156import org.apache.hbase.thirdparty.com.google.common.cache.CacheBuilder;
157import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
158import org.apache.hbase.thirdparty.com.google.protobuf.ByteString;
159import org.apache.hbase.thirdparty.com.google.protobuf.Message;
160import org.apache.hbase.thirdparty.com.google.protobuf.RpcController;
161import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
162import org.apache.hbase.thirdparty.com.google.protobuf.TextFormat;
163import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations;
164import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils;
165
166import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
167import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
168import org.apache.hadoop.hbase.shaded.protobuf.ResponseConverter;
169import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService;
170import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearCompactionQueuesRequest;
171import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearCompactionQueuesResponse;
172import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearRegionBlockCacheRequest;
173import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearRegionBlockCacheResponse;
174import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearSlowLogResponseRequest;
175import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearSlowLogResponses;
176import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CloseRegionRequest;
177import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CloseRegionResponse;
178import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactRegionRequest;
179import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactRegionResponse;
180import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactionSwitchRequest;
181import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactionSwitchResponse;
182import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ExecuteProceduresRequest;
183import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ExecuteProceduresResponse;
184import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionRequest;
185import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionResponse;
186import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetOnlineRegionRequest;
187import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetOnlineRegionResponse;
188import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoRequest;
189import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoResponse;
190import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionLoadRequest;
191import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionLoadResponse;
192import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetServerInfoRequest;
193import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetServerInfoResponse;
194import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetStoreFileRequest;
195import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetStoreFileResponse;
196import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionRequest;
197import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionRequest.RegionOpenInfo;
198import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionResponse;
199import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionResponse.RegionOpeningState;
200import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.RemoteProcedureRequest;
201import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ReplicateWALEntryRequest;
202import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ReplicateWALEntryResponse;
203import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.RollWALWriterRequest;
204import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.RollWALWriterResponse;
205import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.SlowLogResponseRequest;
206import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.SlowLogResponses;
207import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.StopServerRequest;
208import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.StopServerResponse;
209import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateConfigurationRequest;
210import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateConfigurationResponse;
211import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateFavoredNodesRequest;
212import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateFavoredNodesResponse;
213import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WALEntry;
214import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WarmupRegionRequest;
215import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WarmupRegionResponse;
216import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
217import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.Action;
218import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.BulkLoadHFileRequest;
219import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.BulkLoadHFileRequest.FamilyPath;
220import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.BulkLoadHFileResponse;
221import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CleanupBulkLoadRequest;
222import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CleanupBulkLoadResponse;
223import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService;
224import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.Condition;
225import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceRequest;
226import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceResponse;
227import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.GetRequest;
228import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.GetResponse;
229import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MultiRegionLoadStats;
230import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MultiRequest;
231import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MultiResponse;
232import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateRequest;
233import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateResponse;
234import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutationProto;
235import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutationProto.MutationType;
236import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.PrepareBulkLoadRequest;
237import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.PrepareBulkLoadResponse;
238import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.RegionAction;
239import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.RegionActionResult;
240import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ResultOrException;
241import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanRequest;
242import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanResponse;
243import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos;
244import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos.RegionLoad;
245import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.NameBytesPair;
246import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.NameInt64Pair;
247import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier;
248import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType;
249import org.apache.hadoop.hbase.shaded.protobuf.generated.MapReduceProtos.ScanMetrics;
250import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsRequest;
251import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsResponse;
252import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsResponse.TableQuotaSnapshot;
253import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader;
254import org.apache.hadoop.hbase.shaded.protobuf.generated.TooSlowLog.SlowLogPayload;
255import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.BulkLoadDescriptor;
256import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.CompactionDescriptor;
257import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.FlushDescriptor;
258import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.RegionEventDescriptor;
259
260/**
261 * Implements the regionserver RPC services.
262 */
263@InterfaceAudience.Private
264@SuppressWarnings("deprecation")
265public class RSRpcServices implements HBaseRPCErrorHandler,
266    AdminService.BlockingInterface, ClientService.BlockingInterface, PriorityFunction,
267    ConfigurationObserver {
268  protected static final Logger LOG = LoggerFactory.getLogger(RSRpcServices.class);
269
270  /** RPC scheduler to use for the region server. */
271  public static final String REGION_SERVER_RPC_SCHEDULER_FACTORY_CLASS =
272    "hbase.region.server.rpc.scheduler.factory.class";
273
274  /** RPC scheduler to use for the master. */
275  public static final String MASTER_RPC_SCHEDULER_FACTORY_CLASS =
276    "hbase.master.rpc.scheduler.factory.class";
277
278  /**
279   * Minimum allowable time limit delta (in milliseconds) that can be enforced during scans. This
280   * configuration exists to prevent the scenario where a time limit is specified to be so
281   * restrictive that the time limit is reached immediately (before any cells are scanned).
282   */
283  private static final String REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA =
284      "hbase.region.server.rpc.minimum.scan.time.limit.delta";
285  /**
286   * Default value of {@link RSRpcServices#REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA}
287   */
288  private static final long DEFAULT_REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA = 10;
289
290  /**
291   * Number of rows in a batch operation above which a warning will be logged.
292   */
293  static final String BATCH_ROWS_THRESHOLD_NAME = "hbase.rpc.rows.warning.threshold";
294  /**
295   * Default value of {@link RSRpcServices#BATCH_ROWS_THRESHOLD_NAME}
296   */
297  static final int BATCH_ROWS_THRESHOLD_DEFAULT = 5000;
298
299  /*
300   * Whether to reject rows with size > threshold defined by
301   * {@link RSRpcServices#BATCH_ROWS_THRESHOLD_NAME}
302   */
303  private static final String REJECT_BATCH_ROWS_OVER_THRESHOLD =
304    "hbase.rpc.rows.size.threshold.reject";
305
306  /*
307   * Default value of config {@link RSRpcServices#REJECT_BATCH_ROWS_OVER_THRESHOLD}
308   */
309  private static final boolean DEFAULT_REJECT_BATCH_ROWS_OVER_THRESHOLD = false;
310
311  // Request counter. (Includes requests that are not serviced by regions.)
312  // Count only once for requests with multiple actions like multi/caching-scan/replayBatch
313  final LongAdder requestCount = new LongAdder();
314
315  // Request counter for rpc get
316  final LongAdder rpcGetRequestCount = new LongAdder();
317
318  // Request counter for rpc scan
319  final LongAdder rpcScanRequestCount = new LongAdder();
320
321  // Request counter for rpc multi
322  final LongAdder rpcMultiRequestCount = new LongAdder();
323
324  // Request counter for rpc mutate
325  final LongAdder rpcMutateRequestCount = new LongAdder();
326
327  // Server to handle client requests.
328  final RpcServerInterface rpcServer;
329  final InetSocketAddress isa;
330
331  @VisibleForTesting
332  protected final HRegionServer regionServer;
333  private final long maxScannerResultSize;
334
335  // The reference to the priority extraction function
336  private final PriorityFunction priority;
337
338  private ScannerIdGenerator scannerIdGenerator;
339  private final ConcurrentMap<String, RegionScannerHolder> scanners = new ConcurrentHashMap<>();
340  // Hold the name of a closed scanner for a while. This is used to keep compatible for old clients
341  // which may send next or close request to a region scanner which has already been exhausted. The
342  // entries will be removed automatically after scannerLeaseTimeoutPeriod.
343  private final Cache<String, String> closedScanners;
344  /**
345   * The lease timeout period for client scanners (milliseconds).
346   */
347  private final int scannerLeaseTimeoutPeriod;
348
349  /**
350   * The RPC timeout period (milliseconds)
351   */
352  private final int rpcTimeout;
353
354  /**
355   * The minimum allowable delta to use for the scan limit
356   */
357  private final long minimumScanTimeLimitDelta;
358
359  /**
360   * Row size threshold for multi requests above which a warning is logged
361   */
362  private final int rowSizeWarnThreshold;
363  /*
364   * Whether we should reject requests with very high no of rows i.e. beyond threshold
365   * defined by rowSizeWarnThreshold
366   */
367  private final boolean rejectRowsWithSizeOverThreshold;
368
369  final AtomicBoolean clearCompactionQueues = new AtomicBoolean(false);
370
371  private AccessChecker accessChecker;
372  private ZKPermissionWatcher zkPermissionWatcher;
373
374  /**
375   * Services launched in RSRpcServices. By default they are on but you can use the below
376   * booleans to selectively enable/disable either Admin or Client Service (Rare is the case
377   * where you would ever turn off one or the other).
378   */
379  public static final String REGIONSERVER_ADMIN_SERVICE_CONFIG =
380      "hbase.regionserver.admin.executorService";
381  public static final String REGIONSERVER_CLIENT_SERVICE_CONFIG =
382      "hbase.regionserver.client.executorService";
383
384  /**
385   * An Rpc callback for closing a RegionScanner.
386   */
387  private static final class RegionScannerCloseCallBack implements RpcCallback {
388
389    private final RegionScanner scanner;
390
391    public RegionScannerCloseCallBack(RegionScanner scanner) {
392      this.scanner = scanner;
393    }
394
395    @Override
396    public void run() throws IOException {
397      this.scanner.close();
398    }
399  }
400
401  /**
402   * An Rpc callback for doing shipped() call on a RegionScanner.
403   */
404  private class RegionScannerShippedCallBack implements RpcCallback {
405
406    private final String scannerName;
407    private final Shipper shipper;
408    private final Lease lease;
409
410    public RegionScannerShippedCallBack(String scannerName, Shipper shipper, Lease lease) {
411      this.scannerName = scannerName;
412      this.shipper = shipper;
413      this.lease = lease;
414    }
415
416    @Override
417    public void run() throws IOException {
418      this.shipper.shipped();
419      // We're done. On way out re-add the above removed lease. The lease was temp removed for this
420      // Rpc call and we are at end of the call now. Time to add it back.
421      if (scanners.containsKey(scannerName)) {
422        if (lease != null) {
423          regionServer.getLeaseManager().addLease(lease);
424        }
425      }
426    }
427  }
428
429  /**
430   * An RpcCallBack that creates a list of scanners that needs to perform callBack operation on
431   * completion of multiGets.
432   */
433   static class RegionScannersCloseCallBack implements RpcCallback {
434    private final List<RegionScanner> scanners = new ArrayList<>();
435
436    public void addScanner(RegionScanner scanner) {
437      this.scanners.add(scanner);
438    }
439
440    @Override
441    public void run() {
442      for (RegionScanner scanner : scanners) {
443        try {
444          scanner.close();
445        } catch (IOException e) {
446          LOG.error("Exception while closing the scanner " + scanner, e);
447        }
448      }
449    }
450  }
451
452  /**
453   * Holder class which holds the RegionScanner, nextCallSeq and RpcCallbacks together.
454   */
455  private static final class RegionScannerHolder {
456
457    private final AtomicLong nextCallSeq = new AtomicLong(0);
458    private final String scannerName;
459    private final RegionScanner s;
460    private final HRegion r;
461    private final RpcCallback closeCallBack;
462    private final RpcCallback shippedCallback;
463    private byte[] rowOfLastPartialResult;
464    private boolean needCursor;
465
466    public RegionScannerHolder(String scannerName, RegionScanner s, HRegion r,
467        RpcCallback closeCallBack, RpcCallback shippedCallback, boolean needCursor) {
468      this.scannerName = scannerName;
469      this.s = s;
470      this.r = r;
471      this.closeCallBack = closeCallBack;
472      this.shippedCallback = shippedCallback;
473      this.needCursor = needCursor;
474    }
475
476    public long getNextCallSeq() {
477      return nextCallSeq.get();
478    }
479
480    public boolean incNextCallSeq(long currentSeq) {
481      // Use CAS to prevent multiple scan request running on the same scanner.
482      return nextCallSeq.compareAndSet(currentSeq, currentSeq + 1);
483    }
484  }
485
486  /**
487   * Instantiated as a scanner lease. If the lease times out, the scanner is
488   * closed
489   */
490  private class ScannerListener implements LeaseListener {
491    private final String scannerName;
492
493    ScannerListener(final String n) {
494      this.scannerName = n;
495    }
496
497    @Override
498    public void leaseExpired() {
499      RegionScannerHolder rsh = scanners.remove(this.scannerName);
500      if (rsh != null) {
501        RegionScanner s = rsh.s;
502        LOG.info("Scanner " + this.scannerName + " lease expired on region "
503          + s.getRegionInfo().getRegionNameAsString());
504        HRegion region = null;
505        try {
506          region = regionServer.getRegion(s.getRegionInfo().getRegionName());
507          if (region != null && region.getCoprocessorHost() != null) {
508            region.getCoprocessorHost().preScannerClose(s);
509          }
510        } catch (IOException e) {
511          LOG.error("Closing scanner for " + s.getRegionInfo().getRegionNameAsString(), e);
512        } finally {
513          try {
514            s.close();
515            if (region != null && region.getCoprocessorHost() != null) {
516              region.getCoprocessorHost().postScannerClose(s);
517            }
518          } catch (IOException e) {
519            LOG.error("Closing scanner for " + s.getRegionInfo().getRegionNameAsString(), e);
520          }
521        }
522      } else {
523        LOG.warn("Scanner " + this.scannerName + " lease expired, but no related" +
524          " scanner found, hence no chance to close that related scanner!");
525      }
526    }
527  }
528
529  private static ResultOrException getResultOrException(final ClientProtos.Result r,
530                                                        final int index){
531    return getResultOrException(ResponseConverter.buildActionResult(r), index);
532  }
533
534  private static ResultOrException getResultOrException(final Exception e, final int index) {
535    return getResultOrException(ResponseConverter.buildActionResult(e), index);
536  }
537
538  private static ResultOrException getResultOrException(
539      final ResultOrException.Builder builder, final int index) {
540    return builder.setIndex(index).build();
541  }
542
543  /**
544   * Checks for the following pre-checks in order:
545   * <ol>
546   *   <li>RegionServer is running</li>
547   *   <li>If authorization is enabled, then RPC caller has ADMIN permissions</li>
548   * </ol>
549   * @param requestName name of rpc request. Used in reporting failures to provide context.
550   * @throws ServiceException If any of the above listed pre-check fails.
551   */
552  private void rpcPreCheck(String requestName) throws ServiceException {
553    try {
554      checkOpen();
555      requirePermission(requestName, Permission.Action.ADMIN);
556    } catch (IOException ioe) {
557      throw new ServiceException(ioe);
558    }
559  }
560
561  /**
562   * Starts the nonce operation for a mutation, if needed.
563   * @param mutation Mutation.
564   * @param nonceGroup Nonce group from the request.
565   * @return whether to proceed this mutation.
566   */
567  private boolean startNonceOperation(final MutationProto mutation, long nonceGroup)
568      throws IOException {
569    if (regionServer.nonceManager == null || !mutation.hasNonce()) return true;
570    boolean canProceed = false;
571    try {
572      canProceed = regionServer.nonceManager.startOperation(
573        nonceGroup, mutation.getNonce(), regionServer);
574    } catch (InterruptedException ex) {
575      throw new InterruptedIOException("Nonce start operation interrupted");
576    }
577    return canProceed;
578  }
579
580  /**
581   * Ends nonce operation for a mutation, if needed.
582   * @param mutation Mutation.
583   * @param nonceGroup Nonce group from the request. Always 0 in initial implementation.
584   * @param success Whether the operation for this nonce has succeeded.
585   */
586  private void endNonceOperation(final MutationProto mutation,
587      long nonceGroup, boolean success) {
588    if (regionServer.nonceManager != null && mutation.hasNonce()) {
589      regionServer.nonceManager.endOperation(nonceGroup, mutation.getNonce(), success);
590    }
591  }
592
593  private boolean isClientCellBlockSupport(RpcCallContext context) {
594    return context != null && context.isClientCellBlockSupported();
595  }
596
597  private void addResult(final MutateResponse.Builder builder, final Result result,
598      final HBaseRpcController rpcc, boolean clientCellBlockSupported) {
599    if (result == null) return;
600    if (clientCellBlockSupported) {
601      builder.setResult(ProtobufUtil.toResultNoData(result));
602      rpcc.setCellScanner(result.cellScanner());
603    } else {
604      ClientProtos.Result pbr = ProtobufUtil.toResult(result);
605      builder.setResult(pbr);
606    }
607  }
608
609  private void addResults(ScanResponse.Builder builder, List<Result> results,
610      HBaseRpcController controller, boolean isDefaultRegion, boolean clientCellBlockSupported) {
611    builder.setStale(!isDefaultRegion);
612    if (results.isEmpty()) {
613      return;
614    }
615    if (clientCellBlockSupported) {
616      for (Result res : results) {
617        builder.addCellsPerResult(res.size());
618        builder.addPartialFlagPerResult(res.mayHaveMoreCellsInRow());
619      }
620      controller.setCellScanner(CellUtil.createCellScanner(results));
621    } else {
622      for (Result res : results) {
623        ClientProtos.Result pbr = ProtobufUtil.toResult(res);
624        builder.addResults(pbr);
625      }
626    }
627  }
628
629  /**
630   * Mutate a list of rows atomically.
631   * @param cellScanner if non-null, the mutation data -- the Cell content.
632   */
633  private boolean checkAndRowMutate(final HRegion region, final List<ClientProtos.Action> actions,
634    final CellScanner cellScanner, byte[] row, byte[] family, byte[] qualifier,
635    CompareOperator op, ByteArrayComparable comparator, Filter filter, TimeRange timeRange,
636    RegionActionResult.Builder builder, ActivePolicyEnforcement spaceQuotaEnforcement)
637    throws IOException {
638    int countOfCompleteMutation = 0;
639    try {
640      if (!region.getRegionInfo().isMetaRegion()) {
641        regionServer.getMemStoreFlusher().reclaimMemStoreMemory();
642      }
643      RowMutations rm = null;
644      int i = 0;
645      ClientProtos.ResultOrException.Builder resultOrExceptionOrBuilder =
646        ClientProtos.ResultOrException.newBuilder();
647      for (ClientProtos.Action action: actions) {
648        if (action.hasGet()) {
649          throw new DoNotRetryIOException("Atomic put and/or delete only, not a Get=" +
650            action.getGet());
651        }
652        MutationType type = action.getMutation().getMutateType();
653        if (rm == null) {
654          rm = new RowMutations(action.getMutation().getRow().toByteArray(), actions.size());
655        }
656        switch (type) {
657          case PUT:
658            Put put = ProtobufUtil.toPut(action.getMutation(), cellScanner);
659            ++countOfCompleteMutation;
660            checkCellSizeLimit(region, put);
661            spaceQuotaEnforcement.getPolicyEnforcement(region).check(put);
662            rm.add(put);
663            break;
664          case DELETE:
665            Delete del = ProtobufUtil.toDelete(action.getMutation(), cellScanner);
666            ++countOfCompleteMutation;
667            spaceQuotaEnforcement.getPolicyEnforcement(region).check(del);
668            rm.add(del);
669            break;
670          default:
671            throw new DoNotRetryIOException("Atomic put and/or delete only, not " + type.name());
672        }
673        // To unify the response format with doNonAtomicRegionMutation and read through client's
674        // AsyncProcess we have to add an empty result instance per operation
675        resultOrExceptionOrBuilder.clear();
676        resultOrExceptionOrBuilder.setIndex(i++);
677        builder.addResultOrException(
678          resultOrExceptionOrBuilder.build());
679      }
680
681      if (filter != null) {
682        return region.checkAndRowMutate(row, filter, timeRange, rm);
683      } else {
684        return region.checkAndRowMutate(row, family, qualifier, op, comparator, timeRange, rm);
685      }
686    } finally {
687      // Currently, the checkAndMutate isn't supported by batch so it won't mess up the cell scanner
688      // even if the malformed cells are not skipped.
689      for (int i = countOfCompleteMutation; i < actions.size(); ++i) {
690        skipCellsForMutation(actions.get(i), cellScanner);
691      }
692    }
693  }
694
695  /**
696   * Execute an append mutation.
697   *
698   * @return result to return to client if default operation should be
699   * bypassed as indicated by RegionObserver, null otherwise
700   */
701  private Result append(final HRegion region, final OperationQuota quota,
702      final MutationProto mutation, final CellScanner cellScanner, long nonceGroup,
703      ActivePolicyEnforcement spaceQuota)
704      throws IOException {
705    long before = EnvironmentEdgeManager.currentTime();
706    Append append = ProtobufUtil.toAppend(mutation, cellScanner);
707    checkCellSizeLimit(region, append);
708    spaceQuota.getPolicyEnforcement(region).check(append);
709    quota.addMutation(append);
710    Result r = null;
711    if (region.getCoprocessorHost() != null) {
712      r = region.getCoprocessorHost().preAppend(append);
713    }
714    if (r == null) {
715      boolean canProceed = startNonceOperation(mutation, nonceGroup);
716      boolean success = false;
717      try {
718        long nonce = mutation.hasNonce() ? mutation.getNonce() : HConstants.NO_NONCE;
719        if (canProceed) {
720          r = region.append(append, nonceGroup, nonce);
721        } else {
722          // convert duplicate append to get
723          List<Cell> results = region.get(toGet(append), false, nonceGroup, nonce);
724          r = Result.create(results);
725        }
726        success = true;
727      } finally {
728        if (canProceed) {
729          endNonceOperation(mutation, nonceGroup, success);
730        }
731      }
732      if (region.getCoprocessorHost() != null) {
733        r = region.getCoprocessorHost().postAppend(append, r);
734      }
735    }
736    if (regionServer.getMetrics() != null) {
737      regionServer.getMetrics().updateAppend(
738          region.getTableDescriptor().getTableName(),
739        EnvironmentEdgeManager.currentTime() - before);
740    }
741    return r == null ? Result.EMPTY_RESULT : r;
742  }
743
744  /**
745   * Execute an increment mutation.
746   */
747  private Result increment(final HRegion region, final OperationQuota quota,
748      final MutationProto mutation, final CellScanner cells, long nonceGroup,
749      ActivePolicyEnforcement spaceQuota)
750      throws IOException {
751    long before = EnvironmentEdgeManager.currentTime();
752    Increment increment = ProtobufUtil.toIncrement(mutation, cells);
753    checkCellSizeLimit(region, increment);
754    spaceQuota.getPolicyEnforcement(region).check(increment);
755    quota.addMutation(increment);
756    Result r = null;
757    if (region.getCoprocessorHost() != null) {
758      r = region.getCoprocessorHost().preIncrement(increment);
759    }
760    if (r == null) {
761      boolean canProceed = startNonceOperation(mutation, nonceGroup);
762      boolean success = false;
763      try {
764        long nonce = mutation.hasNonce() ? mutation.getNonce() : HConstants.NO_NONCE;
765        if (canProceed) {
766          r = region.increment(increment, nonceGroup, nonce);
767        } else {
768          // convert duplicate increment to get
769          List<Cell> results = region.get(toGet(increment), false, nonceGroup, nonce);
770          r = Result.create(results);
771        }
772        success = true;
773      } finally {
774        if (canProceed) {
775          endNonceOperation(mutation, nonceGroup, success);
776        }
777      }
778      if (region.getCoprocessorHost() != null) {
779        r = region.getCoprocessorHost().postIncrement(increment, r);
780      }
781    }
782    final MetricsRegionServer metricsRegionServer = regionServer.getMetrics();
783    if (metricsRegionServer != null) {
784      metricsRegionServer.updateIncrement(
785          region.getTableDescriptor().getTableName(),
786          EnvironmentEdgeManager.currentTime() - before);
787    }
788    return r == null ? Result.EMPTY_RESULT : r;
789  }
790
791  private static Get toGet(final Mutation mutation) throws IOException {
792    if(!(mutation instanceof Increment) && !(mutation instanceof Append)) {
793      throw new AssertionError("mutation must be a instance of Increment or Append");
794    }
795    Get get = new Get(mutation.getRow());
796    CellScanner cellScanner = mutation.cellScanner();
797    while (!cellScanner.advance()) {
798      Cell cell = cellScanner.current();
799      get.addColumn(CellUtil.cloneFamily(cell), CellUtil.cloneQualifier(cell));
800    }
801    if (mutation instanceof Increment) {
802      // Increment
803      Increment increment = (Increment) mutation;
804      get.setTimeRange(increment.getTimeRange().getMin(), increment.getTimeRange().getMax());
805    } else {
806      // Append
807      Append append = (Append) mutation;
808      get.setTimeRange(append.getTimeRange().getMin(), append.getTimeRange().getMax());
809    }
810    for (Entry<String, byte[]> entry : mutation.getAttributesMap().entrySet()) {
811      get.setAttribute(entry.getKey(), entry.getValue());
812    }
813    return get;
814  }
815
816  /**
817   * Run through the regionMutation <code>rm</code> and per Mutation, do the work, and then when
818   * done, add an instance of a {@link ResultOrException} that corresponds to each Mutation.
819   * @param cellsToReturn  Could be null. May be allocated in this method.  This is what this
820   * method returns as a 'result'.
821   * @param closeCallBack the callback to be used with multigets
822   * @param context the current RpcCallContext
823   * @return Return the <code>cellScanner</code> passed
824   */
825  private List<CellScannable> doNonAtomicRegionMutation(final HRegion region,
826      final OperationQuota quota, final RegionAction actions, final CellScanner cellScanner,
827      final RegionActionResult.Builder builder, List<CellScannable> cellsToReturn, long nonceGroup,
828      final RegionScannersCloseCallBack closeCallBack, RpcCallContext context,
829      ActivePolicyEnforcement spaceQuotaEnforcement) {
830    // Gather up CONTIGUOUS Puts and Deletes in this mutations List.  Idea is that rather than do
831    // one at a time, we instead pass them in batch.  Be aware that the corresponding
832    // ResultOrException instance that matches each Put or Delete is then added down in the
833    // doNonAtomicBatchOp call.  We should be staying aligned though the Put and Delete are
834    // deferred/batched
835    List<ClientProtos.Action> mutations = null;
836    long maxQuotaResultSize = Math.min(maxScannerResultSize, quota.getReadAvailable());
837    IOException sizeIOE = null;
838    Object lastBlock = null;
839    ClientProtos.ResultOrException.Builder resultOrExceptionBuilder = ResultOrException.newBuilder();
840    boolean hasResultOrException = false;
841    for (ClientProtos.Action action : actions.getActionList()) {
842      hasResultOrException = false;
843      resultOrExceptionBuilder.clear();
844      try {
845        Result r = null;
846
847        if (context != null
848            && context.isRetryImmediatelySupported()
849            && (context.getResponseCellSize() > maxQuotaResultSize
850              || context.getResponseBlockSize() + context.getResponseExceptionSize()
851              > maxQuotaResultSize)) {
852
853          // We're storing the exception since the exception and reason string won't
854          // change after the response size limit is reached.
855          if (sizeIOE == null ) {
856            // We don't need the stack un-winding do don't throw the exception.
857            // Throwing will kill the JVM's JIT.
858            //
859            // Instead just create the exception and then store it.
860            sizeIOE = new MultiActionResultTooLarge("Max size exceeded"
861                + " CellSize: " + context.getResponseCellSize()
862                + " BlockSize: " + context.getResponseBlockSize());
863
864            // Only report the exception once since there's only one request that
865            // caused the exception. Otherwise this number will dominate the exceptions count.
866            rpcServer.getMetrics().exception(sizeIOE);
867          }
868
869          // Now that there's an exception is known to be created
870          // use it for the response.
871          //
872          // This will create a copy in the builder.
873          NameBytesPair pair = ResponseConverter.buildException(sizeIOE);
874          resultOrExceptionBuilder.setException(pair);
875          context.incrementResponseExceptionSize(pair.getSerializedSize());
876          resultOrExceptionBuilder.setIndex(action.getIndex());
877          builder.addResultOrException(resultOrExceptionBuilder.build());
878          skipCellsForMutation(action, cellScanner);
879          continue;
880        }
881        if (action.hasGet()) {
882          long before = EnvironmentEdgeManager.currentTime();
883          ClientProtos.Get pbGet = action.getGet();
884          // An asynchbase client, https://github.com/OpenTSDB/asynchbase, starts by trying to do
885          // a get closest before. Throwing the UnknownProtocolException signals it that it needs
886          // to switch and do hbase2 protocol (HBase servers do not tell clients what versions
887          // they are; its a problem for non-native clients like asynchbase. HBASE-20225.
888          if (pbGet.hasClosestRowBefore() && pbGet.getClosestRowBefore()) {
889            throw new UnknownProtocolException("Is this a pre-hbase-1.0.0 or asynchbase client? " +
890                "Client is invoking getClosestRowBefore removed in hbase-2.0.0 replaced by " +
891                "reverse Scan.");
892          }
893          try {
894            Get get = ProtobufUtil.toGet(pbGet);
895            if (context != null) {
896              r = get(get, (region), closeCallBack, context);
897            } else {
898              r = region.get(get);
899            }
900          } finally {
901            final MetricsRegionServer metricsRegionServer = regionServer.getMetrics();
902            if (metricsRegionServer != null) {
903              metricsRegionServer.updateGet(
904                  region.getTableDescriptor().getTableName(),
905                  EnvironmentEdgeManager.currentTime() - before);
906            }
907          }
908        } else if (action.hasServiceCall()) {
909          hasResultOrException = true;
910          com.google.protobuf.Message result =
911            execServiceOnRegion(region, action.getServiceCall());
912          ClientProtos.CoprocessorServiceResult.Builder serviceResultBuilder =
913            ClientProtos.CoprocessorServiceResult.newBuilder();
914          resultOrExceptionBuilder.setServiceResult(
915            serviceResultBuilder.setValue(
916              serviceResultBuilder.getValueBuilder()
917                .setName(result.getClass().getName())
918                // TODO: Copy!!!
919                .setValue(UnsafeByteOperations.unsafeWrap(result.toByteArray()))));
920        } else if (action.hasMutation()) {
921          MutationType type = action.getMutation().getMutateType();
922          if (type != MutationType.PUT && type != MutationType.DELETE && mutations != null &&
923              !mutations.isEmpty()) {
924            // Flush out any Puts or Deletes already collected.
925            doNonAtomicBatchOp(builder, region, quota, mutations, cellScanner,
926              spaceQuotaEnforcement);
927            mutations.clear();
928          }
929          switch (type) {
930            case APPEND:
931              r = append(region, quota, action.getMutation(), cellScanner, nonceGroup,
932                  spaceQuotaEnforcement);
933              break;
934            case INCREMENT:
935              r = increment(region, quota, action.getMutation(), cellScanner, nonceGroup,
936                  spaceQuotaEnforcement);
937              break;
938            case PUT:
939            case DELETE:
940              // Collect the individual mutations and apply in a batch
941              if (mutations == null) {
942                mutations = new ArrayList<>(actions.getActionCount());
943              }
944              mutations.add(action);
945              break;
946            default:
947              throw new DoNotRetryIOException("Unsupported mutate type: " + type.name());
948          }
949        } else {
950          throw new HBaseIOException("Unexpected Action type");
951        }
952        if (r != null) {
953          ClientProtos.Result pbResult = null;
954          if (isClientCellBlockSupport(context)) {
955            pbResult = ProtobufUtil.toResultNoData(r);
956            //  Hard to guess the size here.  Just make a rough guess.
957            if (cellsToReturn == null) {
958              cellsToReturn = new ArrayList<>();
959            }
960            cellsToReturn.add(r);
961          } else {
962            pbResult = ProtobufUtil.toResult(r);
963          }
964          lastBlock = addSize(context, r, lastBlock);
965          hasResultOrException = true;
966          resultOrExceptionBuilder.setResult(pbResult);
967        }
968        // Could get to here and there was no result and no exception.  Presumes we added
969        // a Put or Delete to the collecting Mutations List for adding later.  In this
970        // case the corresponding ResultOrException instance for the Put or Delete will be added
971        // down in the doNonAtomicBatchOp method call rather than up here.
972      } catch (IOException ie) {
973        rpcServer.getMetrics().exception(ie);
974        hasResultOrException = true;
975        NameBytesPair pair = ResponseConverter.buildException(ie);
976        resultOrExceptionBuilder.setException(pair);
977        context.incrementResponseExceptionSize(pair.getSerializedSize());
978      }
979      if (hasResultOrException) {
980        // Propagate index.
981        resultOrExceptionBuilder.setIndex(action.getIndex());
982        builder.addResultOrException(resultOrExceptionBuilder.build());
983      }
984    }
985    // Finish up any outstanding mutations
986    if (!CollectionUtils.isEmpty(mutations)) {
987      doNonAtomicBatchOp(builder, region, quota, mutations, cellScanner, spaceQuotaEnforcement);
988    }
989    return cellsToReturn;
990  }
991
992  private void checkCellSizeLimit(final HRegion r, final Mutation m) throws IOException {
993    if (r.maxCellSize > 0) {
994      CellScanner cells = m.cellScanner();
995      while (cells.advance()) {
996        int size = PrivateCellUtil.estimatedSerializedSizeOf(cells.current());
997        if (size > r.maxCellSize) {
998          String msg = "Cell with size " + size + " exceeds limit of " + r.maxCellSize + " bytes";
999          LOG.debug(msg);
1000          throw new DoNotRetryIOException(msg);
1001        }
1002      }
1003    }
1004  }
1005
1006  private void doAtomicBatchOp(final RegionActionResult.Builder builder, final HRegion region,
1007    final OperationQuota quota, final List<ClientProtos.Action> mutations,
1008    final CellScanner cells, ActivePolicyEnforcement spaceQuotaEnforcement)
1009    throws IOException {
1010    // Just throw the exception. The exception will be caught and then added to region-level
1011    // exception for RegionAction. Leaving the null to action result is ok since the null
1012    // result is viewed as failure by hbase client. And the region-lever exception will be used
1013    // to replaced the null result. see AsyncRequestFutureImpl#receiveMultiAction and
1014    // AsyncBatchRpcRetryingCaller#onComplete for more details.
1015    doBatchOp(builder, region, quota, mutations, cells, spaceQuotaEnforcement, true);
1016  }
1017
1018  private void doNonAtomicBatchOp(final RegionActionResult.Builder builder, final HRegion region,
1019    final OperationQuota quota, final List<ClientProtos.Action> mutations,
1020    final CellScanner cells, ActivePolicyEnforcement spaceQuotaEnforcement) {
1021    try {
1022      doBatchOp(builder, region, quota, mutations, cells, spaceQuotaEnforcement, false);
1023    } catch (IOException e) {
1024      // Set the exception for each action. The mutations in same RegionAction are group to
1025      // different batch and then be processed individually. Hence, we don't set the region-level
1026      // exception here for whole RegionAction.
1027      for (Action mutation : mutations) {
1028        builder.addResultOrException(getResultOrException(e, mutation.getIndex()));
1029      }
1030    }
1031  }
1032
1033  /**
1034   * Execute a list of Put/Delete mutations.
1035   *
1036   * @param builder
1037   * @param region
1038   * @param mutations
1039   */
1040  private void doBatchOp(final RegionActionResult.Builder builder, final HRegion region,
1041      final OperationQuota quota, final List<ClientProtos.Action> mutations,
1042      final CellScanner cells, ActivePolicyEnforcement spaceQuotaEnforcement, boolean atomic)
1043      throws IOException {
1044    Mutation[] mArray = new Mutation[mutations.size()];
1045    long before = EnvironmentEdgeManager.currentTime();
1046    boolean batchContainsPuts = false, batchContainsDelete = false;
1047    try {
1048      /** HBASE-17924
1049       * mutationActionMap is a map to map the relation between mutations and actions
1050       * since mutation array may have been reoredered.In order to return the right
1051       * result or exception to the corresponding actions, We need to know which action
1052       * is the mutation belong to. We can't sort ClientProtos.Action array, since they
1053       * are bonded to cellscanners.
1054       */
1055      Map<Mutation, ClientProtos.Action> mutationActionMap = new HashMap<>();
1056      int i = 0;
1057      for (ClientProtos.Action action: mutations) {
1058        if (action.hasGet()) {
1059          throw new DoNotRetryIOException("Atomic put and/or delete only, not a Get=" +
1060            action.getGet());
1061        }
1062        MutationProto m = action.getMutation();
1063        Mutation mutation;
1064        if (m.getMutateType() == MutationType.PUT) {
1065          mutation = ProtobufUtil.toPut(m, cells);
1066          batchContainsPuts = true;
1067        } else {
1068          mutation = ProtobufUtil.toDelete(m, cells);
1069          batchContainsDelete = true;
1070        }
1071        mutationActionMap.put(mutation, action);
1072        mArray[i++] = mutation;
1073        checkCellSizeLimit(region, mutation);
1074        // Check if a space quota disallows this mutation
1075        spaceQuotaEnforcement.getPolicyEnforcement(region).check(mutation);
1076        quota.addMutation(mutation);
1077      }
1078
1079      if (!region.getRegionInfo().isMetaRegion()) {
1080        regionServer.getMemStoreFlusher().reclaimMemStoreMemory();
1081      }
1082
1083      // HBASE-17924
1084      // Sort to improve lock efficiency for non-atomic batch of operations. If atomic
1085      // order is preserved as its expected from the client
1086      if (!atomic) {
1087        Arrays.sort(mArray, (v1, v2) -> Row.COMPARATOR.compare(v1, v2));
1088      }
1089
1090      OperationStatus[] codes = region.batchMutate(mArray, atomic, HConstants.NO_NONCE,
1091        HConstants.NO_NONCE);
1092      for (i = 0; i < codes.length; i++) {
1093        Mutation currentMutation = mArray[i];
1094        ClientProtos.Action currentAction = mutationActionMap.get(currentMutation);
1095        int index = currentAction.hasIndex() || !atomic ? currentAction.getIndex() : i;
1096        Exception e = null;
1097        switch (codes[i].getOperationStatusCode()) {
1098          case BAD_FAMILY:
1099            e = new NoSuchColumnFamilyException(codes[i].getExceptionMsg());
1100            builder.addResultOrException(getResultOrException(e, index));
1101            break;
1102
1103          case SANITY_CHECK_FAILURE:
1104            e = new FailedSanityCheckException(codes[i].getExceptionMsg());
1105            builder.addResultOrException(getResultOrException(e, index));
1106            break;
1107
1108          default:
1109            e = new DoNotRetryIOException(codes[i].getExceptionMsg());
1110            builder.addResultOrException(getResultOrException(e, index));
1111            break;
1112
1113          case SUCCESS:
1114            builder.addResultOrException(getResultOrException(
1115              ClientProtos.Result.getDefaultInstance(), index));
1116            break;
1117
1118          case STORE_TOO_BUSY:
1119            e = new RegionTooBusyException(codes[i].getExceptionMsg());
1120            builder.addResultOrException(getResultOrException(e, index));
1121            break;
1122        }
1123      }
1124    } finally {
1125      int processedMutationIndex = 0;
1126      for (Action mutation : mutations) {
1127        // The non-null mArray[i] means the cell scanner has been read.
1128        if (mArray[processedMutationIndex++] == null) {
1129          skipCellsForMutation(mutation, cells);
1130        }
1131      }
1132      updateMutationMetrics(region, before, batchContainsPuts, batchContainsDelete);
1133    }
1134  }
1135
1136  private void updateMutationMetrics(HRegion region, long starttime, boolean batchContainsPuts,
1137    boolean batchContainsDelete) {
1138    final MetricsRegionServer metricsRegionServer = regionServer.getMetrics();
1139    if (metricsRegionServer != null) {
1140      long after = EnvironmentEdgeManager.currentTime();
1141      if (batchContainsPuts) {
1142        metricsRegionServer.updatePutBatch(
1143            region.getTableDescriptor().getTableName(), after - starttime);
1144      }
1145      if (batchContainsDelete) {
1146        metricsRegionServer.updateDeleteBatch(
1147            region.getTableDescriptor().getTableName(), after - starttime);
1148      }
1149    }
1150  }
1151
1152  /**
1153   * Execute a list of Put/Delete mutations. The function returns OperationStatus instead of
1154   * constructing MultiResponse to save a possible loop if caller doesn't need MultiResponse.
1155   * @param region
1156   * @param mutations
1157   * @param replaySeqId
1158   * @return an array of OperationStatus which internally contains the OperationStatusCode and the
1159   *         exceptionMessage if any
1160   * @throws IOException
1161   */
1162  private OperationStatus [] doReplayBatchOp(final HRegion region,
1163      final List<MutationReplay> mutations, long replaySeqId) throws IOException {
1164    long before = EnvironmentEdgeManager.currentTime();
1165    boolean batchContainsPuts = false, batchContainsDelete = false;
1166    try {
1167      for (Iterator<MutationReplay> it = mutations.iterator(); it.hasNext();) {
1168        MutationReplay m = it.next();
1169
1170        if (m.getType() == MutationType.PUT) {
1171          batchContainsPuts = true;
1172        } else {
1173          batchContainsDelete = true;
1174        }
1175
1176        NavigableMap<byte[], List<Cell>> map = m.mutation.getFamilyCellMap();
1177        List<Cell> metaCells = map.get(WALEdit.METAFAMILY);
1178        if (metaCells != null && !metaCells.isEmpty()) {
1179          for (Cell metaCell : metaCells) {
1180            CompactionDescriptor compactionDesc = WALEdit.getCompaction(metaCell);
1181            boolean isDefaultReplica = RegionReplicaUtil.isDefaultReplica(region.getRegionInfo());
1182            HRegion hRegion = region;
1183            if (compactionDesc != null) {
1184              // replay the compaction. Remove the files from stores only if we are the primary
1185              // region replica (thus own the files)
1186              hRegion.replayWALCompactionMarker(compactionDesc, !isDefaultReplica, isDefaultReplica,
1187                replaySeqId);
1188              continue;
1189            }
1190            FlushDescriptor flushDesc = WALEdit.getFlushDescriptor(metaCell);
1191            if (flushDesc != null && !isDefaultReplica) {
1192              hRegion.replayWALFlushMarker(flushDesc, replaySeqId);
1193              continue;
1194            }
1195            RegionEventDescriptor regionEvent = WALEdit.getRegionEventDescriptor(metaCell);
1196            if (regionEvent != null && !isDefaultReplica) {
1197              hRegion.replayWALRegionEventMarker(regionEvent);
1198              continue;
1199            }
1200            BulkLoadDescriptor bulkLoadEvent = WALEdit.getBulkLoadDescriptor(metaCell);
1201            if (bulkLoadEvent != null) {
1202              hRegion.replayWALBulkLoadEventMarker(bulkLoadEvent);
1203              continue;
1204            }
1205          }
1206          it.remove();
1207        }
1208      }
1209      requestCount.increment();
1210      if (!region.getRegionInfo().isMetaRegion()) {
1211        regionServer.getMemStoreFlusher().reclaimMemStoreMemory();
1212      }
1213      return region.batchReplay(mutations.toArray(
1214        new MutationReplay[mutations.size()]), replaySeqId);
1215    } finally {
1216      updateMutationMetrics(region, before, batchContainsPuts, batchContainsDelete);
1217    }
1218  }
1219
1220  private void closeAllScanners() {
1221    // Close any outstanding scanners. Means they'll get an UnknownScanner
1222    // exception next time they come in.
1223    for (Map.Entry<String, RegionScannerHolder> e : scanners.entrySet()) {
1224      try {
1225        e.getValue().s.close();
1226      } catch (IOException ioe) {
1227        LOG.warn("Closing scanner " + e.getKey(), ioe);
1228      }
1229    }
1230  }
1231
1232  // Exposed for testing
1233  interface LogDelegate {
1234    void logBatchWarning(String firstRegionName, int sum, int rowSizeWarnThreshold);
1235  }
1236
1237  private static LogDelegate DEFAULT_LOG_DELEGATE = new LogDelegate() {
1238    @Override
1239    public void logBatchWarning(String firstRegionName, int sum, int rowSizeWarnThreshold) {
1240      if (LOG.isWarnEnabled()) {
1241        LOG.warn("Large batch operation detected (greater than " + rowSizeWarnThreshold
1242            + ") (HBASE-18023)." + " Requested Number of Rows: " + sum + " Client: "
1243            + RpcServer.getRequestUserName().orElse(null) + "/"
1244            + RpcServer.getRemoteAddress().orElse(null)
1245            + " first region in multi=" + firstRegionName);
1246      }
1247    }
1248  };
1249
1250  private final LogDelegate ld;
1251
1252  public RSRpcServices(final HRegionServer rs) throws IOException {
1253    this(rs, DEFAULT_LOG_DELEGATE);
1254  }
1255
1256  // Directly invoked only for testing
1257  RSRpcServices(final HRegionServer rs, final LogDelegate ld) throws IOException {
1258    final Configuration conf = rs.getConfiguration();
1259    this.ld = ld;
1260    regionServer = rs;
1261    rowSizeWarnThreshold = conf.getInt(BATCH_ROWS_THRESHOLD_NAME, BATCH_ROWS_THRESHOLD_DEFAULT);
1262    rejectRowsWithSizeOverThreshold =
1263      conf.getBoolean(REJECT_BATCH_ROWS_OVER_THRESHOLD, DEFAULT_REJECT_BATCH_ROWS_OVER_THRESHOLD);
1264
1265    final RpcSchedulerFactory rpcSchedulerFactory;
1266    try {
1267      rpcSchedulerFactory = getRpcSchedulerFactoryClass().asSubclass(RpcSchedulerFactory.class)
1268          .getDeclaredConstructor().newInstance();
1269    } catch (NoSuchMethodException | InvocationTargetException |
1270        InstantiationException | IllegalAccessException e) {
1271      throw new IllegalArgumentException(e);
1272    }
1273    // Server to handle client requests.
1274    final InetSocketAddress initialIsa;
1275    final InetSocketAddress bindAddress;
1276    if(this instanceof MasterRpcServices) {
1277      String hostname = DNS.getHostname(conf, DNS.ServerType.MASTER);
1278      int port = conf.getInt(HConstants.MASTER_PORT, HConstants.DEFAULT_MASTER_PORT);
1279      // Creation of a HSA will force a resolve.
1280      initialIsa = new InetSocketAddress(hostname, port);
1281      bindAddress = new InetSocketAddress(conf.get("hbase.master.ipc.address", hostname), port);
1282    } else {
1283      String hostname = DNS.getHostname(conf, DNS.ServerType.REGIONSERVER);
1284      int port = conf.getInt(HConstants.REGIONSERVER_PORT,
1285        HConstants.DEFAULT_REGIONSERVER_PORT);
1286      // Creation of a HSA will force a resolve.
1287      initialIsa = new InetSocketAddress(hostname, port);
1288      bindAddress =
1289          new InetSocketAddress(conf.get("hbase.regionserver.ipc.address", hostname), port);
1290    }
1291    if (initialIsa.getAddress() == null) {
1292      throw new IllegalArgumentException("Failed resolve of " + initialIsa);
1293    }
1294    priority = createPriority();
1295    // Using Address means we don't get the IP too. Shorten it more even to just the host name
1296    // w/o the domain.
1297    final String name = rs.getProcessName() + "/" +
1298        Address.fromParts(initialIsa.getHostName(), initialIsa.getPort()).toStringWithoutDomain();
1299    // Set how many times to retry talking to another server over Connection.
1300    ConnectionUtils.setServerSideHConnectionRetriesConfig(conf, name, LOG);
1301    rpcServer = createRpcServer(rs, rpcSchedulerFactory, bindAddress, name);
1302    rpcServer.setRsRpcServices(this);
1303    if (!(rs instanceof HMaster)) {
1304      rpcServer.setSlowLogRecorder(rs.getSlowLogRecorder());
1305    }
1306    scannerLeaseTimeoutPeriod = conf.getInt(
1307      HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD,
1308      HConstants.DEFAULT_HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD);
1309    maxScannerResultSize = conf.getLong(
1310      HConstants.HBASE_SERVER_SCANNER_MAX_RESULT_SIZE_KEY,
1311      HConstants.DEFAULT_HBASE_SERVER_SCANNER_MAX_RESULT_SIZE);
1312    rpcTimeout = conf.getInt(
1313      HConstants.HBASE_RPC_TIMEOUT_KEY,
1314      HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
1315    minimumScanTimeLimitDelta = conf.getLong(
1316      REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA,
1317      DEFAULT_REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA);
1318
1319    final InetSocketAddress address = rpcServer.getListenerAddress();
1320    if (address == null) {
1321      throw new IOException("Listener channel is closed");
1322    }
1323    // Set our address, however we need the final port that was given to rpcServer
1324    isa = new InetSocketAddress(initialIsa.getHostName(), address.getPort());
1325    rpcServer.setErrorHandler(this);
1326    rs.setName(name);
1327
1328    closedScanners = CacheBuilder.newBuilder()
1329        .expireAfterAccess(scannerLeaseTimeoutPeriod, TimeUnit.MILLISECONDS).build();
1330  }
1331
1332  protected RpcServerInterface createRpcServer(
1333      final Server server,
1334      final RpcSchedulerFactory rpcSchedulerFactory,
1335      final InetSocketAddress bindAddress,
1336      final String name
1337  ) throws IOException {
1338    final Configuration conf = server.getConfiguration();
1339    boolean reservoirEnabled = conf.getBoolean(ByteBuffAllocator.ALLOCATOR_POOL_ENABLED_KEY, true);
1340    try {
1341      return RpcServerFactory.createRpcServer(server, name, getServices(),
1342          bindAddress, // use final bindAddress for this server.
1343          conf, rpcSchedulerFactory.create(conf, this, server), reservoirEnabled);
1344    } catch (BindException be) {
1345      throw new IOException(be.getMessage() + ". To switch ports use the '"
1346          + HConstants.REGIONSERVER_PORT + "' configuration property.",
1347          be.getCause() != null ? be.getCause() : be);
1348    }
1349  }
1350
1351  protected Class<?> getRpcSchedulerFactoryClass() {
1352    final Configuration conf = regionServer.getConfiguration();
1353    return conf.getClass(REGION_SERVER_RPC_SCHEDULER_FACTORY_CLASS,
1354      SimpleRpcSchedulerFactory.class);
1355  }
1356
1357  @Override
1358  public void onConfigurationChange(Configuration newConf) {
1359    if (rpcServer instanceof ConfigurationObserver) {
1360      ((ConfigurationObserver)rpcServer).onConfigurationChange(newConf);
1361    }
1362  }
1363
1364  protected PriorityFunction createPriority() {
1365    return new AnnotationReadingPriorityFunction(this);
1366  }
1367
1368  protected void requirePermission(String request, Permission.Action perm) throws IOException {
1369    if (accessChecker != null) {
1370      accessChecker.requirePermission(RpcServer.getRequestUser().orElse(null), request, null, perm);
1371    }
1372  }
1373
1374  @VisibleForTesting
1375  public int getScannersCount() {
1376    return scanners.size();
1377  }
1378
1379  public
1380  RegionScanner getScanner(long scannerId) {
1381    String scannerIdString = Long.toString(scannerId);
1382    RegionScannerHolder scannerHolder = scanners.get(scannerIdString);
1383    if (scannerHolder != null) {
1384      return scannerHolder.s;
1385    }
1386    return null;
1387  }
1388
1389  public String getScanDetailsWithId(long scannerId) {
1390    RegionScanner scanner = getScanner(scannerId);
1391    if (scanner == null) {
1392      return null;
1393    }
1394    StringBuilder builder = new StringBuilder();
1395    builder.append("table: ").append(scanner.getRegionInfo().getTable().getNameAsString());
1396    builder.append(" region: ").append(scanner.getRegionInfo().getRegionNameAsString());
1397    return builder.toString();
1398  }
1399
1400  public String getScanDetailsWithRequest(ScanRequest request) {
1401    try {
1402      if (!request.hasRegion()) {
1403        return null;
1404      }
1405      Region region = getRegion(request.getRegion());
1406      StringBuilder builder = new StringBuilder();
1407      builder.append("table: ").append(region.getRegionInfo().getTable().getNameAsString());
1408      builder.append(" region: ").append(region.getRegionInfo().getRegionNameAsString());
1409      return builder.toString();
1410    } catch (IOException ignored) {
1411      return null;
1412    }
1413  }
1414
1415  /**
1416   * Get the vtime associated with the scanner.
1417   * Currently the vtime is the number of "next" calls.
1418   */
1419  long getScannerVirtualTime(long scannerId) {
1420    String scannerIdString = Long.toString(scannerId);
1421    RegionScannerHolder scannerHolder = scanners.get(scannerIdString);
1422    if (scannerHolder != null) {
1423      return scannerHolder.getNextCallSeq();
1424    }
1425    return 0L;
1426  }
1427
1428  /**
1429   * Method to account for the size of retained cells and retained data blocks.
1430   * @param context rpc call context
1431   * @param r result to add size.
1432   * @param lastBlock last block to check whether we need to add the block size in context.
1433   * @return an object that represents the last referenced block from this response.
1434   */
1435  Object addSize(RpcCallContext context, Result r, Object lastBlock) {
1436    if (context != null && r != null && !r.isEmpty()) {
1437      for (Cell c : r.rawCells()) {
1438        context.incrementResponseCellSize(PrivateCellUtil.estimatedSerializedSizeOf(c));
1439
1440        // Since byte buffers can point all kinds of crazy places it's harder to keep track
1441        // of which blocks are kept alive by what byte buffer.
1442        // So we make a guess.
1443        if (c instanceof ByteBufferExtendedCell) {
1444          ByteBufferExtendedCell bbCell = (ByteBufferExtendedCell) c;
1445          ByteBuffer bb = bbCell.getValueByteBuffer();
1446          if (bb != lastBlock) {
1447            context.incrementResponseBlockSize(bb.capacity());
1448            lastBlock = bb;
1449          }
1450        } else {
1451          // We're using the last block being the same as the current block as
1452          // a proxy for pointing to a new block. This won't be exact.
1453          // If there are multiple gets that bounce back and forth
1454          // Then it's possible that this will over count the size of
1455          // referenced blocks. However it's better to over count and
1456          // use two rpcs than to OOME the regionserver.
1457          byte[] valueArray = c.getValueArray();
1458          if (valueArray != lastBlock) {
1459            context.incrementResponseBlockSize(valueArray.length);
1460            lastBlock = valueArray;
1461          }
1462        }
1463
1464      }
1465    }
1466    return lastBlock;
1467  }
1468
1469  private RegionScannerHolder addScanner(String scannerName, RegionScanner s, Shipper shipper,
1470      HRegion r, boolean needCursor) throws LeaseStillHeldException {
1471    Lease lease = regionServer.getLeaseManager().createLease(
1472        scannerName, this.scannerLeaseTimeoutPeriod, new ScannerListener(scannerName));
1473    RpcCallback shippedCallback = new RegionScannerShippedCallBack(scannerName, shipper, lease);
1474    RpcCallback closeCallback;
1475    if (s instanceof RpcCallback) {
1476      closeCallback = (RpcCallback) s;
1477    } else {
1478      closeCallback = new RegionScannerCloseCallBack(s);
1479    }
1480    RegionScannerHolder rsh =
1481        new RegionScannerHolder(scannerName, s, r, closeCallback, shippedCallback, needCursor);
1482    RegionScannerHolder existing = scanners.putIfAbsent(scannerName, rsh);
1483    assert existing == null : "scannerId must be unique within regionserver's whole lifecycle! " +
1484      scannerName;
1485    return rsh;
1486  }
1487
1488  /**
1489   * Find the HRegion based on a region specifier
1490   *
1491   * @param regionSpecifier the region specifier
1492   * @return the corresponding region
1493   * @throws IOException if the specifier is not null,
1494   *    but failed to find the region
1495   */
1496  @VisibleForTesting
1497  public HRegion getRegion(
1498      final RegionSpecifier regionSpecifier) throws IOException {
1499    return regionServer.getRegion(regionSpecifier.getValue().toByteArray());
1500  }
1501
1502  /**
1503   * Find the List of HRegions based on a list of region specifiers
1504   *
1505   * @param regionSpecifiers the list of region specifiers
1506   * @return the corresponding list of regions
1507   * @throws IOException if any of the specifiers is not null,
1508   *    but failed to find the region
1509   */
1510  private List<HRegion> getRegions(final List<RegionSpecifier> regionSpecifiers,
1511      final CacheEvictionStatsBuilder stats) {
1512    List<HRegion> regions = Lists.newArrayListWithCapacity(regionSpecifiers.size());
1513    for (RegionSpecifier regionSpecifier: regionSpecifiers) {
1514      try {
1515        regions.add(regionServer.getRegion(regionSpecifier.getValue().toByteArray()));
1516      } catch (NotServingRegionException e) {
1517        stats.addException(regionSpecifier.getValue().toByteArray(), e);
1518      }
1519    }
1520    return regions;
1521  }
1522
1523  @VisibleForTesting
1524  public PriorityFunction getPriority() {
1525    return priority;
1526  }
1527
1528  @VisibleForTesting
1529  public Configuration getConfiguration() {
1530    return regionServer.getConfiguration();
1531  }
1532
1533  private RegionServerRpcQuotaManager getRpcQuotaManager() {
1534    return regionServer.getRegionServerRpcQuotaManager();
1535  }
1536
1537  private RegionServerSpaceQuotaManager getSpaceQuotaManager() {
1538    return regionServer.getRegionServerSpaceQuotaManager();
1539  }
1540
1541  void start(ZKWatcher zkWatcher) {
1542    if (AccessChecker.isAuthorizationSupported(getConfiguration())) {
1543      accessChecker = new AccessChecker(getConfiguration());
1544    } else {
1545      accessChecker = new NoopAccessChecker(getConfiguration());
1546    }
1547    if (!getConfiguration().getBoolean("hbase.testing.nocluster", false) && zkWatcher != null) {
1548      zkPermissionWatcher =
1549          new ZKPermissionWatcher(zkWatcher, accessChecker.getAuthManager(), getConfiguration());
1550      try {
1551        zkPermissionWatcher.start();
1552      } catch (KeeperException e) {
1553        LOG.error("ZooKeeper permission watcher initialization failed", e);
1554      }
1555    }
1556    this.scannerIdGenerator = new ScannerIdGenerator(this.regionServer.serverName);
1557    rpcServer.start();
1558  }
1559
1560  void stop() {
1561    if (zkPermissionWatcher != null) {
1562      zkPermissionWatcher.close();
1563    }
1564    closeAllScanners();
1565    rpcServer.stop();
1566  }
1567
1568  /**
1569   * Called to verify that this server is up and running.
1570   */
1571  // TODO : Rename this and HMaster#checkInitialized to isRunning() (or a better name).
1572  protected void checkOpen() throws IOException {
1573    if (regionServer.isAborted()) {
1574      throw new RegionServerAbortedException("Server " + regionServer.serverName + " aborting");
1575    }
1576    if (regionServer.isStopped()) {
1577      throw new RegionServerStoppedException("Server " + regionServer.serverName + " stopping");
1578    }
1579    if (!regionServer.isDataFileSystemOk()) {
1580      throw new RegionServerStoppedException("File system not available");
1581    }
1582    if (!regionServer.isOnline()) {
1583      throw new ServerNotRunningYetException("Server " + regionServer.serverName
1584          + " is not running yet");
1585    }
1586  }
1587
1588  /**
1589   * By default, put up an Admin and a Client Service.
1590   * Set booleans <code>hbase.regionserver.admin.executorService</code> and
1591   * <code>hbase.regionserver.client.executorService</code> if you want to enable/disable services.
1592   * Default is that both are enabled.
1593   * @return immutable list of blocking services and the security info classes that this server
1594   * supports
1595   */
1596  protected List<BlockingServiceAndInterface> getServices() {
1597    boolean admin =
1598      getConfiguration().getBoolean(REGIONSERVER_ADMIN_SERVICE_CONFIG, true);
1599    boolean client =
1600      getConfiguration().getBoolean(REGIONSERVER_CLIENT_SERVICE_CONFIG, true);
1601    List<BlockingServiceAndInterface> bssi = new ArrayList<>();
1602    if (client) {
1603      bssi.add(new BlockingServiceAndInterface(
1604      ClientService.newReflectiveBlockingService(this),
1605      ClientService.BlockingInterface.class));
1606    }
1607    if (admin) {
1608      bssi.add(new BlockingServiceAndInterface(
1609      AdminService.newReflectiveBlockingService(this),
1610      AdminService.BlockingInterface.class));
1611    }
1612    return new org.apache.hbase.thirdparty.com.google.common.collect.
1613        ImmutableList.Builder<BlockingServiceAndInterface>().addAll(bssi).build();
1614  }
1615
1616  public InetSocketAddress getSocketAddress() {
1617    return isa;
1618  }
1619
1620  @Override
1621  public int getPriority(RequestHeader header, Message param, User user) {
1622    return priority.getPriority(header, param, user);
1623  }
1624
1625  @Override
1626  public long getDeadline(RequestHeader header, Message param) {
1627    return priority.getDeadline(header, param);
1628  }
1629
1630  /*
1631   * Check if an OOME and, if so, abort immediately to avoid creating more objects.
1632   *
1633   * @param e
1634   *
1635   * @return True if we OOME'd and are aborting.
1636   */
1637  @Override
1638  public boolean checkOOME(final Throwable e) {
1639    return exitIfOOME(e);
1640  }
1641
1642  public static boolean exitIfOOME(final Throwable e ){
1643    boolean stop = false;
1644    try {
1645      if (e instanceof OutOfMemoryError
1646          || (e.getCause() != null && e.getCause() instanceof OutOfMemoryError)
1647          || (e.getMessage() != null && e.getMessage().contains(
1648              "java.lang.OutOfMemoryError"))) {
1649        stop = true;
1650        LOG.error(HBaseMarkers.FATAL, "Run out of memory; "
1651          + RSRpcServices.class.getSimpleName() + " will abort itself immediately",
1652          e);
1653      }
1654    } finally {
1655      if (stop) {
1656        Runtime.getRuntime().halt(1);
1657      }
1658    }
1659    return stop;
1660  }
1661
1662  /**
1663   * Close a region on the region server.
1664   *
1665   * @param controller the RPC controller
1666   * @param request the request
1667   * @throws ServiceException
1668   */
1669  @Override
1670  @QosPriority(priority=HConstants.ADMIN_QOS)
1671  public CloseRegionResponse closeRegion(final RpcController controller,
1672      final CloseRegionRequest request) throws ServiceException {
1673    final ServerName sn = (request.hasDestinationServer() ?
1674      ProtobufUtil.toServerName(request.getDestinationServer()) : null);
1675
1676    try {
1677      checkOpen();
1678      throwOnWrongStartCode(request);
1679      final String encodedRegionName = ProtobufUtil.getRegionEncodedName(request.getRegion());
1680
1681      requestCount.increment();
1682      if (sn == null) {
1683        LOG.info("Close " + encodedRegionName + " without moving");
1684      } else {
1685        LOG.info("Close " + encodedRegionName + ", moving to " + sn);
1686      }
1687      boolean closed = regionServer.closeRegion(encodedRegionName, false, sn);
1688      CloseRegionResponse.Builder builder = CloseRegionResponse.newBuilder().setClosed(closed);
1689      return builder.build();
1690    } catch (IOException ie) {
1691      throw new ServiceException(ie);
1692    }
1693  }
1694
1695  /**
1696   * Compact a region on the region server.
1697   *
1698   * @param controller the RPC controller
1699   * @param request the request
1700   * @throws ServiceException
1701   */
1702  @Override
1703  @QosPriority(priority = HConstants.ADMIN_QOS)
1704  public CompactRegionResponse compactRegion(final RpcController controller,
1705      final CompactRegionRequest request) throws ServiceException {
1706    try {
1707      checkOpen();
1708      requestCount.increment();
1709      HRegion region = getRegion(request.getRegion());
1710      // Quota support is enabled, the requesting user is not system/super user
1711      // and a quota policy is enforced that disables compactions.
1712      if (QuotaUtil.isQuotaEnabled(getConfiguration()) &&
1713          !Superusers.isSuperUser(RpcServer.getRequestUser().orElse(null)) &&
1714          this.regionServer.getRegionServerSpaceQuotaManager()
1715              .areCompactionsDisabled(region.getTableDescriptor().getTableName())) {
1716        throw new DoNotRetryIOException(
1717            "Compactions on this region are " + "disabled due to a space quota violation.");
1718      }
1719      region.startRegionOperation(Operation.COMPACT_REGION);
1720      LOG.info("Compacting " + region.getRegionInfo().getRegionNameAsString());
1721      boolean major = request.hasMajor() && request.getMajor();
1722      if (request.hasFamily()) {
1723        byte[] family = request.getFamily().toByteArray();
1724        String log = "User-triggered " + (major ? "major " : "") + "compaction for region " +
1725            region.getRegionInfo().getRegionNameAsString() + " and family " +
1726            Bytes.toString(family);
1727        LOG.trace(log);
1728        region.requestCompaction(family, log, Store.PRIORITY_USER, major,
1729          CompactionLifeCycleTracker.DUMMY);
1730      } else {
1731        String log = "User-triggered " + (major ? "major " : "") + "compaction for region " +
1732            region.getRegionInfo().getRegionNameAsString();
1733        LOG.trace(log);
1734        region.requestCompaction(log, Store.PRIORITY_USER, major, CompactionLifeCycleTracker.DUMMY);
1735      }
1736      return CompactRegionResponse.newBuilder().build();
1737    } catch (IOException ie) {
1738      throw new ServiceException(ie);
1739    }
1740  }
1741
1742  @Override
1743  public CompactionSwitchResponse compactionSwitch(RpcController controller,
1744      CompactionSwitchRequest request) throws ServiceException {
1745    try {
1746      checkOpen();
1747      requestCount.increment();
1748      boolean prevState = regionServer.compactSplitThread.isCompactionsEnabled();
1749      CompactionSwitchResponse response =
1750          CompactionSwitchResponse.newBuilder().setPrevState(prevState).build();
1751      if (prevState == request.getEnabled()) {
1752        // passed in requested state is same as current state. No action required
1753        return response;
1754      }
1755      regionServer.compactSplitThread.switchCompaction(request.getEnabled());
1756      return response;
1757    } catch (IOException ie) {
1758      throw new ServiceException(ie);
1759    }
1760  }
1761
1762  /**
1763   * Flush a region on the region server.
1764   *
1765   * @param controller the RPC controller
1766   * @param request the request
1767   * @throws ServiceException
1768   */
1769  @Override
1770  @QosPriority(priority=HConstants.ADMIN_QOS)
1771  public FlushRegionResponse flushRegion(final RpcController controller,
1772      final FlushRegionRequest request) throws ServiceException {
1773    try {
1774      checkOpen();
1775      requestCount.increment();
1776      HRegion region = getRegion(request.getRegion());
1777      LOG.info("Flushing " + region.getRegionInfo().getRegionNameAsString());
1778      boolean shouldFlush = true;
1779      if (request.hasIfOlderThanTs()) {
1780        shouldFlush = region.getEarliestFlushTimeForAllStores() < request.getIfOlderThanTs();
1781      }
1782      FlushRegionResponse.Builder builder = FlushRegionResponse.newBuilder();
1783      if (shouldFlush) {
1784        boolean writeFlushWalMarker =  request.hasWriteFlushWalMarker() ?
1785            request.getWriteFlushWalMarker() : false;
1786        // Go behind the curtain so we can manage writing of the flush WAL marker
1787        HRegion.FlushResultImpl flushResult =
1788            region.flushcache(true, writeFlushWalMarker, FlushLifeCycleTracker.DUMMY);
1789        boolean compactionNeeded = flushResult.isCompactionNeeded();
1790        if (compactionNeeded) {
1791          regionServer.compactSplitThread.requestSystemCompaction(region,
1792            "Compaction through user triggered flush");
1793        }
1794        builder.setFlushed(flushResult.isFlushSucceeded());
1795        builder.setWroteFlushWalMarker(flushResult.wroteFlushWalMarker);
1796      }
1797      builder.setLastFlushTime(region.getEarliestFlushTimeForAllStores());
1798      return builder.build();
1799    } catch (DroppedSnapshotException ex) {
1800      // Cache flush can fail in a few places. If it fails in a critical
1801      // section, we get a DroppedSnapshotException and a replay of wal
1802      // is required. Currently the only way to do this is a restart of
1803      // the server.
1804      regionServer.abort("Replay of WAL required. Forcing server shutdown", ex);
1805      throw new ServiceException(ex);
1806    } catch (IOException ie) {
1807      throw new ServiceException(ie);
1808    }
1809  }
1810
1811  @Override
1812  @QosPriority(priority=HConstants.ADMIN_QOS)
1813  public GetOnlineRegionResponse getOnlineRegion(final RpcController controller,
1814      final GetOnlineRegionRequest request) throws ServiceException {
1815    try {
1816      checkOpen();
1817      requestCount.increment();
1818      Map<String, HRegion> onlineRegions = regionServer.getOnlineRegions();
1819      List<RegionInfo> list = new ArrayList<>(onlineRegions.size());
1820      for (HRegion region: onlineRegions.values()) {
1821        list.add(region.getRegionInfo());
1822      }
1823      list.sort(RegionInfo.COMPARATOR);
1824      return ResponseConverter.buildGetOnlineRegionResponse(list);
1825    } catch (IOException ie) {
1826      throw new ServiceException(ie);
1827    }
1828  }
1829
1830  // Master implementation of this Admin Service differs given it is not
1831  // able to supply detail only known to RegionServer. See note on
1832  // MasterRpcServers#getRegionInfo.
1833  @Override
1834  @QosPriority(priority=HConstants.ADMIN_QOS)
1835  public GetRegionInfoResponse getRegionInfo(final RpcController controller,
1836      final GetRegionInfoRequest request) throws ServiceException {
1837    try {
1838      checkOpen();
1839      requestCount.increment();
1840      HRegion region = getRegion(request.getRegion());
1841      RegionInfo info = region.getRegionInfo();
1842      byte[] bestSplitRow = null;
1843      boolean shouldSplit = true;
1844      if (request.hasBestSplitRow() && request.getBestSplitRow()) {
1845        HRegion r = region;
1846        region.startRegionOperation(Operation.SPLIT_REGION);
1847        r.forceSplit(null);
1848        // Even after setting force split if split policy says no to split then we should not split.
1849        shouldSplit = region.getSplitPolicy().shouldSplit() && !info.isMetaRegion();
1850        bestSplitRow = r.checkSplit();
1851        // when all table data are in memstore, bestSplitRow = null
1852        // try to flush region first
1853        if(bestSplitRow == null) {
1854          r.flush(true);
1855          bestSplitRow = r.checkSplit();
1856        }
1857        r.clearSplit();
1858      }
1859      GetRegionInfoResponse.Builder builder = GetRegionInfoResponse.newBuilder();
1860      builder.setRegionInfo(ProtobufUtil.toRegionInfo(info));
1861      if (request.hasCompactionState() && request.getCompactionState()) {
1862        builder.setCompactionState(ProtobufUtil.createCompactionState(region.getCompactionState()));
1863      }
1864      builder.setSplittable(region.isSplittable() && shouldSplit);
1865      builder.setMergeable(region.isMergeable());
1866      if (request.hasBestSplitRow() && request.getBestSplitRow() && bestSplitRow != null) {
1867        builder.setBestSplitRow(UnsafeByteOperations.unsafeWrap(bestSplitRow));
1868      }
1869      return builder.build();
1870    } catch (IOException ie) {
1871      throw new ServiceException(ie);
1872    }
1873  }
1874
1875  @Override
1876  @QosPriority(priority=HConstants.ADMIN_QOS)
1877  public GetRegionLoadResponse getRegionLoad(RpcController controller,
1878      GetRegionLoadRequest request) throws ServiceException {
1879
1880    List<HRegion> regions;
1881    if (request.hasTableName()) {
1882      TableName tableName = ProtobufUtil.toTableName(request.getTableName());
1883      regions = regionServer.getRegions(tableName);
1884    } else {
1885      regions = regionServer.getRegions();
1886    }
1887    List<RegionLoad> rLoads = new ArrayList<>(regions.size());
1888    RegionLoad.Builder regionLoadBuilder = ClusterStatusProtos.RegionLoad.newBuilder();
1889    RegionSpecifier.Builder regionSpecifier = RegionSpecifier.newBuilder();
1890
1891    try {
1892      for (HRegion region : regions) {
1893        rLoads.add(regionServer.createRegionLoad(region, regionLoadBuilder, regionSpecifier));
1894      }
1895    } catch (IOException e) {
1896      throw new ServiceException(e);
1897    }
1898    GetRegionLoadResponse.Builder builder = GetRegionLoadResponse.newBuilder();
1899    builder.addAllRegionLoads(rLoads);
1900    return builder.build();
1901  }
1902
1903  @Override
1904  @QosPriority(priority=HConstants.ADMIN_QOS)
1905  public ClearCompactionQueuesResponse clearCompactionQueues(RpcController controller,
1906    ClearCompactionQueuesRequest request) throws ServiceException {
1907    LOG.debug("Client=" + RpcServer.getRequestUserName().orElse(null) + "/"
1908        + RpcServer.getRemoteAddress().orElse(null) + " clear compactions queue");
1909    ClearCompactionQueuesResponse.Builder respBuilder = ClearCompactionQueuesResponse.newBuilder();
1910    requestCount.increment();
1911    if (clearCompactionQueues.compareAndSet(false,true)) {
1912      try {
1913        checkOpen();
1914        regionServer.getRegionServerCoprocessorHost().preClearCompactionQueues();
1915        for (String queueName : request.getQueueNameList()) {
1916          LOG.debug("clear " + queueName + " compaction queue");
1917          switch (queueName) {
1918            case "long":
1919              regionServer.compactSplitThread.clearLongCompactionsQueue();
1920              break;
1921            case "short":
1922              regionServer.compactSplitThread.clearShortCompactionsQueue();
1923              break;
1924            default:
1925              LOG.warn("Unknown queue name " + queueName);
1926              throw new IOException("Unknown queue name " + queueName);
1927          }
1928        }
1929        regionServer.getRegionServerCoprocessorHost().postClearCompactionQueues();
1930      } catch (IOException ie) {
1931        throw new ServiceException(ie);
1932      } finally {
1933        clearCompactionQueues.set(false);
1934      }
1935    } else {
1936      LOG.warn("Clear compactions queue is executing by other admin.");
1937    }
1938    return respBuilder.build();
1939  }
1940
1941  /**
1942   * Get some information of the region server.
1943   *
1944   * @param controller the RPC controller
1945   * @param request the request
1946   * @throws ServiceException
1947   */
1948  @Override
1949  @QosPriority(priority=HConstants.ADMIN_QOS)
1950  public GetServerInfoResponse getServerInfo(final RpcController controller,
1951      final GetServerInfoRequest request) throws ServiceException {
1952    try {
1953      checkOpen();
1954    } catch (IOException ie) {
1955      throw new ServiceException(ie);
1956    }
1957    requestCount.increment();
1958    int infoPort = regionServer.infoServer != null ? regionServer.infoServer.getPort() : -1;
1959    return ResponseConverter.buildGetServerInfoResponse(regionServer.serverName, infoPort);
1960  }
1961
1962  @Override
1963  @QosPriority(priority=HConstants.ADMIN_QOS)
1964  public GetStoreFileResponse getStoreFile(final RpcController controller,
1965      final GetStoreFileRequest request) throws ServiceException {
1966    try {
1967      checkOpen();
1968      HRegion region = getRegion(request.getRegion());
1969      requestCount.increment();
1970      Set<byte[]> columnFamilies;
1971      if (request.getFamilyCount() == 0) {
1972        columnFamilies = region.getTableDescriptor().getColumnFamilyNames();
1973      } else {
1974        columnFamilies = new TreeSet<>(Bytes.BYTES_RAWCOMPARATOR);
1975        for (ByteString cf: request.getFamilyList()) {
1976          columnFamilies.add(cf.toByteArray());
1977        }
1978      }
1979      int nCF = columnFamilies.size();
1980      List<String>  fileList = region.getStoreFileList(
1981        columnFamilies.toArray(new byte[nCF][]));
1982      GetStoreFileResponse.Builder builder = GetStoreFileResponse.newBuilder();
1983      builder.addAllStoreFile(fileList);
1984      return builder.build();
1985    } catch (IOException ie) {
1986      throw new ServiceException(ie);
1987    }
1988  }
1989
1990  private void throwOnWrongStartCode(OpenRegionRequest request) throws ServiceException {
1991    if (!request.hasServerStartCode()) {
1992      LOG.warn("OpenRegionRequest for {} does not have a start code", request.getOpenInfoList());
1993      return;
1994    }
1995    throwOnWrongStartCode(request.getServerStartCode());
1996  }
1997
1998  private void throwOnWrongStartCode(CloseRegionRequest request) throws ServiceException {
1999    if (!request.hasServerStartCode()) {
2000      LOG.warn("CloseRegionRequest for {} does not have a start code", request.getRegion());
2001      return;
2002    }
2003    throwOnWrongStartCode(request.getServerStartCode());
2004  }
2005
2006  private void throwOnWrongStartCode(long serverStartCode) throws ServiceException {
2007    // check that we are the same server that this RPC is intended for.
2008    if (regionServer.serverName.getStartcode() != serverStartCode) {
2009      throw new ServiceException(new DoNotRetryIOException(
2010        "This RPC was intended for a " + "different server with startCode: " + serverStartCode +
2011          ", this server is: " + regionServer.serverName));
2012    }
2013  }
2014
2015  private void throwOnWrongStartCode(ExecuteProceduresRequest req) throws ServiceException {
2016    if (req.getOpenRegionCount() > 0) {
2017      for (OpenRegionRequest openReq : req.getOpenRegionList()) {
2018        throwOnWrongStartCode(openReq);
2019      }
2020    }
2021    if (req.getCloseRegionCount() > 0) {
2022      for (CloseRegionRequest closeReq : req.getCloseRegionList()) {
2023        throwOnWrongStartCode(closeReq);
2024      }
2025    }
2026  }
2027
2028  /**
2029   * Open asynchronously a region or a set of regions on the region server.
2030   *
2031   * The opening is coordinated by ZooKeeper, and this method requires the znode to be created
2032   *  before being called. As a consequence, this method should be called only from the master.
2033   * <p>
2034   * Different manages states for the region are:
2035   * </p><ul>
2036   *  <li>region not opened: the region opening will start asynchronously.</li>
2037   *  <li>a close is already in progress: this is considered as an error.</li>
2038   *  <li>an open is already in progress: this new open request will be ignored. This is important
2039   *  because the Master can do multiple requests if it crashes.</li>
2040   *  <li>the region is already opened:  this new open request will be ignored.</li>
2041   *  </ul>
2042   * <p>
2043   * Bulk assign: If there are more than 1 region to open, it will be considered as a bulk assign.
2044   * For a single region opening, errors are sent through a ServiceException. For bulk assign,
2045   * errors are put in the response as FAILED_OPENING.
2046   * </p>
2047   * @param controller the RPC controller
2048   * @param request the request
2049   * @throws ServiceException
2050   */
2051  @Override
2052  @QosPriority(priority=HConstants.ADMIN_QOS)
2053  public OpenRegionResponse openRegion(final RpcController controller,
2054      final OpenRegionRequest request) throws ServiceException {
2055    requestCount.increment();
2056    throwOnWrongStartCode(request);
2057
2058    OpenRegionResponse.Builder builder = OpenRegionResponse.newBuilder();
2059    final int regionCount = request.getOpenInfoCount();
2060    final Map<TableName, TableDescriptor> htds = new HashMap<>(regionCount);
2061    final boolean isBulkAssign = regionCount > 1;
2062    try {
2063      checkOpen();
2064    } catch (IOException ie) {
2065      TableName tableName = null;
2066      if (regionCount == 1) {
2067        org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionInfo ri = request.getOpenInfo(0).getRegion();
2068        if (ri != null) {
2069          tableName = ProtobufUtil.toTableName(ri.getTableName());
2070        }
2071      }
2072      if (!TableName.META_TABLE_NAME.equals(tableName)) {
2073        throw new ServiceException(ie);
2074      }
2075      // We are assigning meta, wait a little for regionserver to finish initialization.
2076      int timeout = regionServer.getConfiguration().getInt(HConstants.HBASE_RPC_TIMEOUT_KEY,
2077        HConstants.DEFAULT_HBASE_RPC_TIMEOUT) >> 2; // Quarter of RPC timeout
2078      long endTime = System.currentTimeMillis() + timeout;
2079      synchronized (regionServer.online) {
2080        try {
2081          while (System.currentTimeMillis() <= endTime
2082              && !regionServer.isStopped() && !regionServer.isOnline()) {
2083            regionServer.online.wait(regionServer.msgInterval);
2084          }
2085          checkOpen();
2086        } catch (InterruptedException t) {
2087          Thread.currentThread().interrupt();
2088          throw new ServiceException(t);
2089        } catch (IOException e) {
2090          throw new ServiceException(e);
2091        }
2092      }
2093    }
2094
2095    long masterSystemTime = request.hasMasterSystemTime() ? request.getMasterSystemTime() : -1;
2096
2097    for (RegionOpenInfo regionOpenInfo : request.getOpenInfoList()) {
2098      final RegionInfo region = ProtobufUtil.toRegionInfo(regionOpenInfo.getRegion());
2099      TableDescriptor htd;
2100      try {
2101        String encodedName = region.getEncodedName();
2102        byte[] encodedNameBytes = region.getEncodedNameAsBytes();
2103        final HRegion onlineRegion = regionServer.getRegion(encodedName);
2104        if (onlineRegion != null) {
2105          // The region is already online. This should not happen any more.
2106          String error = "Received OPEN for the region:"
2107            + region.getRegionNameAsString() + ", which is already online";
2108          LOG.warn(error);
2109          //regionServer.abort(error);
2110          //throw new IOException(error);
2111          builder.addOpeningState(RegionOpeningState.OPENED);
2112          continue;
2113        }
2114        LOG.info("Open " + region.getRegionNameAsString());
2115
2116        final Boolean previous = regionServer.getRegionsInTransitionInRS().putIfAbsent(
2117          encodedNameBytes, Boolean.TRUE);
2118
2119        if (Boolean.FALSE.equals(previous)) {
2120          if (regionServer.getRegion(encodedName) != null) {
2121            // There is a close in progress. This should not happen any more.
2122            String error = "Received OPEN for the region:"
2123              + region.getRegionNameAsString() + ", which we are already trying to CLOSE";
2124            regionServer.abort(error);
2125            throw new IOException(error);
2126          }
2127          regionServer.getRegionsInTransitionInRS().put(encodedNameBytes, Boolean.TRUE);
2128        }
2129
2130        if (Boolean.TRUE.equals(previous)) {
2131          // An open is in progress. This is supported, but let's log this.
2132          LOG.info("Receiving OPEN for the region:" +
2133            region.getRegionNameAsString() + ", which we are already trying to OPEN"
2134              + " - ignoring this new request for this region.");
2135        }
2136
2137        // We are opening this region. If it moves back and forth for whatever reason, we don't
2138        // want to keep returning the stale moved record while we are opening/if we close again.
2139        regionServer.removeFromMovedRegions(region.getEncodedName());
2140
2141        if (previous == null || !previous.booleanValue()) {
2142          htd = htds.get(region.getTable());
2143          if (htd == null) {
2144            htd = regionServer.tableDescriptors.get(region.getTable());
2145            htds.put(region.getTable(), htd);
2146          }
2147          if (htd == null) {
2148            throw new IOException("Missing table descriptor for " + region.getEncodedName());
2149          }
2150          // If there is no action in progress, we can submit a specific handler.
2151          // Need to pass the expected version in the constructor.
2152          if (regionServer.executorService == null) {
2153            LOG.info("No executor executorService; skipping open request");
2154          } else {
2155            if (region.isMetaRegion()) {
2156              regionServer.executorService.submit(new OpenMetaHandler(
2157              regionServer, regionServer, region, htd, masterSystemTime));
2158            } else {
2159              if (regionOpenInfo.getFavoredNodesCount() > 0) {
2160                regionServer.updateRegionFavoredNodesMapping(region.getEncodedName(),
2161                regionOpenInfo.getFavoredNodesList());
2162              }
2163              if (htd.getPriority() >= HConstants.ADMIN_QOS || region.getTable().isSystemTable()) {
2164                regionServer.executorService.submit(new OpenPriorityRegionHandler(
2165                regionServer, regionServer, region, htd, masterSystemTime));
2166              } else {
2167                regionServer.executorService.submit(new OpenRegionHandler(
2168                regionServer, regionServer, region, htd, masterSystemTime));
2169              }
2170            }
2171          }
2172        }
2173
2174        builder.addOpeningState(RegionOpeningState.OPENED);
2175      } catch (IOException ie) {
2176        LOG.warn("Failed opening region " + region.getRegionNameAsString(), ie);
2177        if (isBulkAssign) {
2178          builder.addOpeningState(RegionOpeningState.FAILED_OPENING);
2179        } else {
2180          throw new ServiceException(ie);
2181        }
2182      }
2183    }
2184    return builder.build();
2185  }
2186
2187  /**
2188   *  Wamrmup a region on this server.
2189   *
2190   * This method should only be called by Master. It synchrnously opens the region and
2191   * closes the region bringing the most important pages in cache.
2192   * <p>
2193   *
2194   * @param controller the RPC controller
2195   * @param request the request
2196   * @throws ServiceException
2197   */
2198  @Override
2199  public WarmupRegionResponse warmupRegion(final RpcController controller,
2200      final WarmupRegionRequest request) throws ServiceException {
2201
2202    final RegionInfo region = ProtobufUtil.toRegionInfo(request.getRegionInfo());
2203    TableDescriptor htd;
2204    WarmupRegionResponse response = WarmupRegionResponse.getDefaultInstance();
2205
2206    try {
2207      checkOpen();
2208      String encodedName = region.getEncodedName();
2209      byte[] encodedNameBytes = region.getEncodedNameAsBytes();
2210      final HRegion onlineRegion = regionServer.getRegion(encodedName);
2211
2212      if (onlineRegion != null) {
2213        LOG.info("Region already online. Skipping warming up " + region);
2214        return response;
2215      }
2216
2217      htd = regionServer.tableDescriptors.get(region.getTable());
2218
2219      if (regionServer.getRegionsInTransitionInRS().containsKey(encodedNameBytes)) {
2220        LOG.info("Region is in transition. Skipping warmup " + region);
2221        return response;
2222      }
2223
2224      LOG.info("Warming up region " + region.getRegionNameAsString());
2225      HRegion.warmupHRegion(region, htd, regionServer.getWAL(region),
2226          regionServer.getConfiguration(), regionServer, null);
2227
2228    } catch (IOException ie) {
2229      LOG.error("Failed warming up region " + region.getRegionNameAsString(), ie);
2230      throw new ServiceException(ie);
2231    }
2232
2233    return response;
2234  }
2235
2236  /**
2237   * Replay the given changes when distributedLogReplay WAL edits from a failed RS. The guarantee is
2238   * that the given mutations will be durable on the receiving RS if this method returns without any
2239   * exception.
2240   * @param controller the RPC controller
2241   * @param request the request
2242   * @throws ServiceException
2243   */
2244  @Override
2245  @QosPriority(priority = HConstants.REPLAY_QOS)
2246  public ReplicateWALEntryResponse replay(final RpcController controller,
2247      final ReplicateWALEntryRequest request) throws ServiceException {
2248    long before = EnvironmentEdgeManager.currentTime();
2249    CellScanner cells = ((HBaseRpcController) controller).cellScanner();
2250    try {
2251      checkOpen();
2252      List<WALEntry> entries = request.getEntryList();
2253      if (entries == null || entries.isEmpty()) {
2254        // empty input
2255        return ReplicateWALEntryResponse.newBuilder().build();
2256      }
2257      ByteString regionName = entries.get(0).getKey().getEncodedRegionName();
2258      HRegion region = regionServer.getRegionByEncodedName(regionName.toStringUtf8());
2259      RegionCoprocessorHost coprocessorHost =
2260          ServerRegionReplicaUtil.isDefaultReplica(region.getRegionInfo())
2261            ? region.getCoprocessorHost()
2262            : null; // do not invoke coprocessors if this is a secondary region replica
2263      List<Pair<WALKey, WALEdit>> walEntries = new ArrayList<>();
2264
2265      // Skip adding the edits to WAL if this is a secondary region replica
2266      boolean isPrimary = RegionReplicaUtil.isDefaultReplica(region.getRegionInfo());
2267      Durability durability = isPrimary ? Durability.USE_DEFAULT : Durability.SKIP_WAL;
2268
2269      for (WALEntry entry : entries) {
2270        if (!regionName.equals(entry.getKey().getEncodedRegionName())) {
2271          throw new NotServingRegionException("Replay request contains entries from multiple " +
2272              "regions. First region:" + regionName.toStringUtf8() + " , other region:"
2273              + entry.getKey().getEncodedRegionName());
2274        }
2275        if (regionServer.nonceManager != null && isPrimary) {
2276          long nonceGroup = entry.getKey().hasNonceGroup()
2277            ? entry.getKey().getNonceGroup() : HConstants.NO_NONCE;
2278          long nonce = entry.getKey().hasNonce() ? entry.getKey().getNonce() : HConstants.NO_NONCE;
2279          regionServer.nonceManager.reportOperationFromWal(
2280              nonceGroup,
2281              nonce,
2282              entry.getKey().getWriteTime());
2283        }
2284        Pair<WALKey, WALEdit> walEntry = (coprocessorHost == null) ? null : new Pair<>();
2285        List<MutationReplay> edits = WALSplitUtil.getMutationsFromWALEntry(entry,
2286          cells, walEntry, durability);
2287        if (coprocessorHost != null) {
2288          // Start coprocessor replay here. The coprocessor is for each WALEdit instead of a
2289          // KeyValue.
2290          if (coprocessorHost.preWALRestore(region.getRegionInfo(), walEntry.getFirst(),
2291            walEntry.getSecond())) {
2292            // if bypass this log entry, ignore it ...
2293            continue;
2294          }
2295          walEntries.add(walEntry);
2296        }
2297        if(edits!=null && !edits.isEmpty()) {
2298          // HBASE-17924
2299          // sort to improve lock efficiency
2300          Collections.sort(edits, (v1, v2) -> Row.COMPARATOR.compare(v1.mutation, v2.mutation));
2301          long replaySeqId = (entry.getKey().hasOrigSequenceNumber()) ?
2302            entry.getKey().getOrigSequenceNumber() : entry.getKey().getLogSequenceNumber();
2303          OperationStatus[] result = doReplayBatchOp(region, edits, replaySeqId);
2304          // check if it's a partial success
2305          for (int i = 0; result != null && i < result.length; i++) {
2306            if (result[i] != OperationStatus.SUCCESS) {
2307              throw new IOException(result[i].getExceptionMsg());
2308            }
2309          }
2310        }
2311      }
2312
2313      //sync wal at the end because ASYNC_WAL is used above
2314      WAL wal = region.getWAL();
2315      if (wal != null) {
2316        wal.sync();
2317      }
2318
2319      if (coprocessorHost != null) {
2320        for (Pair<WALKey, WALEdit> entry : walEntries) {
2321          coprocessorHost.postWALRestore(region.getRegionInfo(), entry.getFirst(),
2322            entry.getSecond());
2323        }
2324      }
2325      return ReplicateWALEntryResponse.newBuilder().build();
2326    } catch (IOException ie) {
2327      throw new ServiceException(ie);
2328    } finally {
2329      final MetricsRegionServer metricsRegionServer = regionServer.getMetrics();
2330      if (metricsRegionServer != null) {
2331        metricsRegionServer.updateReplay(EnvironmentEdgeManager.currentTime() - before);
2332      }
2333    }
2334  }
2335
2336  /**
2337   * Replicate WAL entries on the region server.
2338   *
2339   * @param controller the RPC controller
2340   * @param request the request
2341   */
2342  @Override
2343  @QosPriority(priority=HConstants.REPLICATION_QOS)
2344  public ReplicateWALEntryResponse replicateWALEntry(final RpcController controller,
2345      final ReplicateWALEntryRequest request) throws ServiceException {
2346    try {
2347      checkOpen();
2348      if (regionServer.getReplicationSinkService() != null) {
2349        requestCount.increment();
2350        List<WALEntry> entries = request.getEntryList();
2351        CellScanner cellScanner = ((HBaseRpcController)controller).cellScanner();
2352        regionServer.getRegionServerCoprocessorHost().preReplicateLogEntries();
2353        regionServer.getReplicationSinkService().replicateLogEntries(entries, cellScanner,
2354          request.getReplicationClusterId(), request.getSourceBaseNamespaceDirPath(),
2355          request.getSourceHFileArchiveDirPath());
2356        regionServer.getRegionServerCoprocessorHost().postReplicateLogEntries();
2357        return ReplicateWALEntryResponse.newBuilder().build();
2358      } else {
2359        throw new ServiceException("Replication services are not initialized yet");
2360      }
2361    } catch (IOException ie) {
2362      throw new ServiceException(ie);
2363    }
2364  }
2365
2366  /**
2367   * Roll the WAL writer of the region server.
2368   * @param controller the RPC controller
2369   * @param request the request
2370   * @throws ServiceException
2371   */
2372  @Override
2373  public RollWALWriterResponse rollWALWriter(final RpcController controller,
2374      final RollWALWriterRequest request) throws ServiceException {
2375    try {
2376      checkOpen();
2377      requestCount.increment();
2378      regionServer.getRegionServerCoprocessorHost().preRollWALWriterRequest();
2379      regionServer.getWalRoller().requestRollAll();
2380      regionServer.getRegionServerCoprocessorHost().postRollWALWriterRequest();
2381      RollWALWriterResponse.Builder builder = RollWALWriterResponse.newBuilder();
2382      return builder.build();
2383    } catch (IOException ie) {
2384      throw new ServiceException(ie);
2385    }
2386  }
2387
2388
2389  /**
2390   * Stop the region server.
2391   *
2392   * @param controller the RPC controller
2393   * @param request the request
2394   * @throws ServiceException
2395   */
2396  @Override
2397  @QosPriority(priority=HConstants.ADMIN_QOS)
2398  public StopServerResponse stopServer(final RpcController controller,
2399      final StopServerRequest request) throws ServiceException {
2400    requestCount.increment();
2401    String reason = request.getReason();
2402    regionServer.stop(reason);
2403    return StopServerResponse.newBuilder().build();
2404  }
2405
2406  @Override
2407  public UpdateFavoredNodesResponse updateFavoredNodes(RpcController controller,
2408      UpdateFavoredNodesRequest request) throws ServiceException {
2409    List<UpdateFavoredNodesRequest.RegionUpdateInfo> openInfoList = request.getUpdateInfoList();
2410    UpdateFavoredNodesResponse.Builder respBuilder = UpdateFavoredNodesResponse.newBuilder();
2411    for (UpdateFavoredNodesRequest.RegionUpdateInfo regionUpdateInfo : openInfoList) {
2412      RegionInfo hri = ProtobufUtil.toRegionInfo(regionUpdateInfo.getRegion());
2413      if (regionUpdateInfo.getFavoredNodesCount() > 0) {
2414        regionServer.updateRegionFavoredNodesMapping(hri.getEncodedName(),
2415          regionUpdateInfo.getFavoredNodesList());
2416      }
2417    }
2418    respBuilder.setResponse(openInfoList.size());
2419    return respBuilder.build();
2420  }
2421
2422  /**
2423   * Atomically bulk load several HFiles into an open region
2424   * @return true if successful, false is failed but recoverably (no action)
2425   * @throws ServiceException if failed unrecoverably
2426   */
2427  @Override
2428  public BulkLoadHFileResponse bulkLoadHFile(final RpcController controller,
2429      final BulkLoadHFileRequest request) throws ServiceException {
2430    long start = EnvironmentEdgeManager.currentTime();
2431    List<String> clusterIds = new ArrayList<>(request.getClusterIdsList());
2432    if(clusterIds.contains(this.regionServer.clusterId)){
2433      return BulkLoadHFileResponse.newBuilder().setLoaded(true).build();
2434    } else {
2435      clusterIds.add(this.regionServer.clusterId);
2436    }
2437    try {
2438      checkOpen();
2439      requestCount.increment();
2440      HRegion region = getRegion(request.getRegion());
2441      Map<byte[], List<Path>> map = null;
2442      final boolean spaceQuotaEnabled = QuotaUtil.isQuotaEnabled(getConfiguration());
2443      long sizeToBeLoaded = -1;
2444
2445      // Check to see if this bulk load would exceed the space quota for this table
2446      if (spaceQuotaEnabled) {
2447        ActivePolicyEnforcement activeSpaceQuotas = getSpaceQuotaManager().getActiveEnforcements();
2448        SpaceViolationPolicyEnforcement enforcement = activeSpaceQuotas.getPolicyEnforcement(
2449            region);
2450        if (enforcement != null) {
2451          // Bulk loads must still be atomic. We must enact all or none.
2452          List<String> filePaths = new ArrayList<>(request.getFamilyPathCount());
2453          for (FamilyPath familyPath : request.getFamilyPathList()) {
2454            filePaths.add(familyPath.getPath());
2455          }
2456          // Check if the batch of files exceeds the current quota
2457          sizeToBeLoaded = enforcement.computeBulkLoadSize(regionServer.getFileSystem(), filePaths);
2458        }
2459      }
2460
2461      List<Pair<byte[], String>> familyPaths = new ArrayList<>(request.getFamilyPathCount());
2462      for (FamilyPath familyPath : request.getFamilyPathList()) {
2463        familyPaths.add(new Pair<>(familyPath.getFamily().toByteArray(), familyPath.getPath()));
2464      }
2465      if (!request.hasBulkToken()) {
2466        if (region.getCoprocessorHost() != null) {
2467          region.getCoprocessorHost().preBulkLoadHFile(familyPaths);
2468        }
2469        try {
2470          map = region.bulkLoadHFiles(familyPaths, request.getAssignSeqNum(), null,
2471              request.getCopyFile(), clusterIds, request.getReplicate());
2472        } finally {
2473          if (region.getCoprocessorHost() != null) {
2474            region.getCoprocessorHost().postBulkLoadHFile(familyPaths, map);
2475          }
2476        }
2477      } else {
2478        // secure bulk load
2479        map = regionServer.getSecureBulkLoadManager().secureBulkLoadHFiles(region, request, clusterIds);
2480      }
2481      BulkLoadHFileResponse.Builder builder = BulkLoadHFileResponse.newBuilder();
2482      builder.setLoaded(map != null);
2483      if (map != null) {
2484        // Treat any negative size as a flag to "ignore" updating the region size as that is
2485        // not possible to occur in real life (cannot bulk load a file with negative size)
2486        if (spaceQuotaEnabled && sizeToBeLoaded > 0) {
2487          if (LOG.isTraceEnabled()) {
2488            LOG.trace("Incrementing space use of " + region.getRegionInfo() + " by "
2489                + sizeToBeLoaded + " bytes");
2490          }
2491          // Inform space quotas of the new files for this region
2492          getSpaceQuotaManager().getRegionSizeStore().incrementRegionSize(
2493              region.getRegionInfo(), sizeToBeLoaded);
2494        }
2495      }
2496      return builder.build();
2497    } catch (IOException ie) {
2498      throw new ServiceException(ie);
2499    } finally {
2500      final MetricsRegionServer metricsRegionServer = regionServer.getMetrics();
2501      if (metricsRegionServer != null) {
2502        metricsRegionServer.updateBulkLoad(EnvironmentEdgeManager.currentTime() - start);
2503      }
2504    }
2505  }
2506
2507  @Override
2508  public PrepareBulkLoadResponse prepareBulkLoad(RpcController controller,
2509      PrepareBulkLoadRequest request) throws ServiceException {
2510    try {
2511      checkOpen();
2512      requestCount.increment();
2513
2514      HRegion region = getRegion(request.getRegion());
2515
2516      String bulkToken = regionServer.getSecureBulkLoadManager().prepareBulkLoad(region, request);
2517      PrepareBulkLoadResponse.Builder builder = PrepareBulkLoadResponse.newBuilder();
2518      builder.setBulkToken(bulkToken);
2519      return builder.build();
2520    } catch (IOException ie) {
2521      throw new ServiceException(ie);
2522    }
2523  }
2524
2525  @Override
2526  public CleanupBulkLoadResponse cleanupBulkLoad(RpcController controller,
2527      CleanupBulkLoadRequest request) throws ServiceException {
2528    try {
2529      checkOpen();
2530      requestCount.increment();
2531
2532      HRegion region = getRegion(request.getRegion());
2533
2534      regionServer.getSecureBulkLoadManager().cleanupBulkLoad(region, request);
2535      return CleanupBulkLoadResponse.newBuilder().build();
2536    } catch (IOException ie) {
2537      throw new ServiceException(ie);
2538    }
2539  }
2540
2541  @Override
2542  public CoprocessorServiceResponse execService(final RpcController controller,
2543      final CoprocessorServiceRequest request) throws ServiceException {
2544    try {
2545      checkOpen();
2546      requestCount.increment();
2547      HRegion region = getRegion(request.getRegion());
2548      com.google.protobuf.Message result = execServiceOnRegion(region, request.getCall());
2549      CoprocessorServiceResponse.Builder builder = CoprocessorServiceResponse.newBuilder();
2550      builder.setRegion(RequestConverter.buildRegionSpecifier(
2551        RegionSpecifierType.REGION_NAME, region.getRegionInfo().getRegionName()));
2552      // TODO: COPIES!!!!!!
2553      builder.setValue(builder.getValueBuilder().setName(result.getClass().getName()).
2554        setValue(org.apache.hbase.thirdparty.com.google.protobuf.ByteString.
2555            copyFrom(result.toByteArray())));
2556      return builder.build();
2557    } catch (IOException ie) {
2558      throw new ServiceException(ie);
2559    }
2560  }
2561
2562  private com.google.protobuf.Message execServiceOnRegion(HRegion region,
2563      final ClientProtos.CoprocessorServiceCall serviceCall) throws IOException {
2564    // ignore the passed in controller (from the serialized call)
2565    ServerRpcController execController = new ServerRpcController();
2566    return region.execService(execController, serviceCall);
2567  }
2568
2569  /**
2570   * Get data from a table.
2571   *
2572   * @param controller the RPC controller
2573   * @param request the get request
2574   * @throws ServiceException
2575   */
2576  @Override
2577  public GetResponse get(final RpcController controller,
2578      final GetRequest request) throws ServiceException {
2579    long before = EnvironmentEdgeManager.currentTime();
2580    OperationQuota quota = null;
2581    HRegion region = null;
2582    try {
2583      checkOpen();
2584      requestCount.increment();
2585      rpcGetRequestCount.increment();
2586      region = getRegion(request.getRegion());
2587
2588      GetResponse.Builder builder = GetResponse.newBuilder();
2589      ClientProtos.Get get = request.getGet();
2590      // An asynchbase client, https://github.com/OpenTSDB/asynchbase, starts by trying to do
2591      // a get closest before. Throwing the UnknownProtocolException signals it that it needs
2592      // to switch and do hbase2 protocol (HBase servers do not tell clients what versions
2593      // they are; its a problem for non-native clients like asynchbase. HBASE-20225.
2594      if (get.hasClosestRowBefore() && get.getClosestRowBefore()) {
2595        throw new UnknownProtocolException("Is this a pre-hbase-1.0.0 or asynchbase client? " +
2596            "Client is invoking getClosestRowBefore removed in hbase-2.0.0 replaced by " +
2597            "reverse Scan.");
2598      }
2599      Boolean existence = null;
2600      Result r = null;
2601      RpcCallContext context = RpcServer.getCurrentCall().orElse(null);
2602      quota = getRpcQuotaManager().checkQuota(region, OperationQuota.OperationType.GET);
2603
2604      Get clientGet = ProtobufUtil.toGet(get);
2605      if (get.getExistenceOnly() && region.getCoprocessorHost() != null) {
2606        existence = region.getCoprocessorHost().preExists(clientGet);
2607      }
2608      if (existence == null) {
2609        if (context != null) {
2610          r = get(clientGet, (region), null, context);
2611        } else {
2612          // for test purpose
2613          r = region.get(clientGet);
2614        }
2615        if (get.getExistenceOnly()) {
2616          boolean exists = r.getExists();
2617          if (region.getCoprocessorHost() != null) {
2618            exists = region.getCoprocessorHost().postExists(clientGet, exists);
2619          }
2620          existence = exists;
2621        }
2622      }
2623      if (existence != null) {
2624        ClientProtos.Result pbr =
2625            ProtobufUtil.toResult(existence, region.getRegionInfo().getReplicaId() != 0);
2626        builder.setResult(pbr);
2627      } else if (r != null) {
2628        ClientProtos.Result pbr;
2629        if (isClientCellBlockSupport(context) && controller instanceof HBaseRpcController
2630            && VersionInfoUtil.hasMinimumVersion(context.getClientVersionInfo(), 1, 3)) {
2631          pbr = ProtobufUtil.toResultNoData(r);
2632          ((HBaseRpcController) controller).setCellScanner(CellUtil.createCellScanner(r
2633              .rawCells()));
2634          addSize(context, r, null);
2635        } else {
2636          pbr = ProtobufUtil.toResult(r);
2637        }
2638        builder.setResult(pbr);
2639      }
2640      //r.cells is null when an table.exists(get) call
2641      if (r != null && r.rawCells() != null) {
2642        quota.addGetResult(r);
2643      }
2644      return builder.build();
2645    } catch (IOException ie) {
2646      throw new ServiceException(ie);
2647    } finally {
2648      final MetricsRegionServer metricsRegionServer = regionServer.getMetrics();
2649      if (metricsRegionServer != null) {
2650        TableDescriptor td = region != null? region.getTableDescriptor(): null;
2651        if (td != null) {
2652          metricsRegionServer.updateGet(
2653              td.getTableName(), EnvironmentEdgeManager.currentTime() - before);
2654        }
2655      }
2656      if (quota != null) {
2657        quota.close();
2658      }
2659    }
2660  }
2661
2662  private Result get(Get get, HRegion region, RegionScannersCloseCallBack closeCallBack,
2663      RpcCallContext context) throws IOException {
2664    region.prepareGet(get);
2665    boolean stale = region.getRegionInfo().getReplicaId() != 0;
2666
2667    // This method is almost the same as HRegion#get.
2668    List<Cell> results = new ArrayList<>();
2669    long before = EnvironmentEdgeManager.currentTime();
2670    // pre-get CP hook
2671    if (region.getCoprocessorHost() != null) {
2672      if (region.getCoprocessorHost().preGet(get, results)) {
2673        region.metricsUpdateForGet(results, before);
2674        return Result
2675            .create(results, get.isCheckExistenceOnly() ? !results.isEmpty() : null, stale);
2676      }
2677    }
2678    Scan scan = new Scan(get);
2679    if (scan.getLoadColumnFamiliesOnDemandValue() == null) {
2680      scan.setLoadColumnFamiliesOnDemand(region.isLoadingCfsOnDemandDefault());
2681    }
2682    RegionScannerImpl scanner = null;
2683    try {
2684      scanner = region.getScanner(scan);
2685      scanner.next(results);
2686    } finally {
2687      if (scanner != null) {
2688        if (closeCallBack == null) {
2689          // If there is a context then the scanner can be added to the current
2690          // RpcCallContext. The rpc callback will take care of closing the
2691          // scanner, for eg in case
2692          // of get()
2693          context.setCallBack(scanner);
2694        } else {
2695          // The call is from multi() where the results from the get() are
2696          // aggregated and then send out to the
2697          // rpc. The rpccall back will close all such scanners created as part
2698          // of multi().
2699          closeCallBack.addScanner(scanner);
2700        }
2701      }
2702    }
2703
2704    // post-get CP hook
2705    if (region.getCoprocessorHost() != null) {
2706      region.getCoprocessorHost().postGet(get, results);
2707    }
2708    region.metricsUpdateForGet(results, before);
2709
2710    return Result.create(results, get.isCheckExistenceOnly() ? !results.isEmpty() : null, stale);
2711  }
2712
2713  private void checkBatchSizeAndLogLargeSize(MultiRequest request) throws ServiceException {
2714    int sum = 0;
2715    String firstRegionName = null;
2716    for (RegionAction regionAction : request.getRegionActionList()) {
2717      if (sum == 0) {
2718        firstRegionName = Bytes.toStringBinary(regionAction.getRegion().getValue().toByteArray());
2719      }
2720      sum += regionAction.getActionCount();
2721    }
2722    if (sum > rowSizeWarnThreshold) {
2723      ld.logBatchWarning(firstRegionName, sum, rowSizeWarnThreshold);
2724      if (rejectRowsWithSizeOverThreshold) {
2725        throw new ServiceException(
2726          "Rejecting large batch operation for current batch with firstRegionName: "
2727            + firstRegionName + " , Requested Number of Rows: " + sum + " , Size Threshold: "
2728            + rowSizeWarnThreshold);
2729      }
2730    }
2731  }
2732
2733  /**
2734   * Execute multiple actions on a table: get, mutate, and/or execCoprocessor
2735   *
2736   * @param rpcc the RPC controller
2737   * @param request the multi request
2738   * @throws ServiceException
2739   */
2740  @Override
2741  public MultiResponse multi(final RpcController rpcc, final MultiRequest request)
2742  throws ServiceException {
2743    try {
2744      checkOpen();
2745    } catch (IOException ie) {
2746      throw new ServiceException(ie);
2747    }
2748
2749    checkBatchSizeAndLogLargeSize(request);
2750
2751    // rpc controller is how we bring in data via the back door;  it is unprotobuf'ed data.
2752    // It is also the conduit via which we pass back data.
2753    HBaseRpcController controller = (HBaseRpcController)rpcc;
2754    CellScanner cellScanner = controller != null ? controller.cellScanner(): null;
2755    if (controller != null) {
2756      controller.setCellScanner(null);
2757    }
2758
2759    long nonceGroup = request.hasNonceGroup() ? request.getNonceGroup() : HConstants.NO_NONCE;
2760
2761    // this will contain all the cells that we need to return. It's created later, if needed.
2762    List<CellScannable> cellsToReturn = null;
2763    MultiResponse.Builder responseBuilder = MultiResponse.newBuilder();
2764    RegionActionResult.Builder regionActionResultBuilder = RegionActionResult.newBuilder();
2765    Boolean processed = null;
2766    RegionScannersCloseCallBack closeCallBack = null;
2767    RpcCallContext context = RpcServer.getCurrentCall().orElse(null);
2768    this.rpcMultiRequestCount.increment();
2769    this.requestCount.increment();
2770    Map<RegionSpecifier, ClientProtos.RegionLoadStats> regionStats = new HashMap<>(request
2771      .getRegionActionCount());
2772    ActivePolicyEnforcement spaceQuotaEnforcement = getSpaceQuotaManager().getActiveEnforcements();
2773    for (RegionAction regionAction : request.getRegionActionList()) {
2774      OperationQuota quota;
2775      HRegion region;
2776      regionActionResultBuilder.clear();
2777      RegionSpecifier regionSpecifier = regionAction.getRegion();
2778      try {
2779        region = getRegion(regionSpecifier);
2780        quota = getRpcQuotaManager().checkQuota(region, regionAction.getActionList());
2781      } catch (IOException e) {
2782        rpcServer.getMetrics().exception(e);
2783        regionActionResultBuilder.setException(ResponseConverter.buildException(e));
2784        responseBuilder.addRegionActionResult(regionActionResultBuilder.build());
2785        // All Mutations in this RegionAction not executed as we can not see the Region online here
2786        // in this RS. Will be retried from Client. Skipping all the Cells in CellScanner
2787        // corresponding to these Mutations.
2788        skipCellsForMutations(regionAction.getActionList(), cellScanner);
2789        continue;  // For this region it's a failure.
2790      }
2791
2792      if (regionAction.hasAtomic() && regionAction.getAtomic()) {
2793        // How does this call happen?  It may need some work to play well w/ the surroundings.
2794        // Need to return an item per Action along w/ Action index.  TODO.
2795        try {
2796          if (request.hasCondition()) {
2797            Condition condition = request.getCondition();
2798            byte[] row = condition.getRow().toByteArray();
2799            byte[] family = condition.hasFamily() ? condition.getFamily().toByteArray() : null;
2800            byte[] qualifier = condition.hasQualifier() ?
2801              condition.getQualifier().toByteArray() : null;
2802            CompareOperator op = condition.hasCompareType() ?
2803              CompareOperator.valueOf(condition.getCompareType().name()) : null;
2804            ByteArrayComparable comparator = condition.hasComparator() ?
2805              ProtobufUtil.toComparator(condition.getComparator()) : null;
2806            Filter filter = condition.hasFilter() ?
2807              ProtobufUtil.toFilter(condition.getFilter()) : null;
2808            TimeRange timeRange = condition.hasTimeRange() ?
2809              ProtobufUtil.toTimeRange(condition.getTimeRange()) :
2810              TimeRange.allTime();
2811            processed =
2812              checkAndRowMutate(region, regionAction.getActionList(), cellScanner, row, family,
2813                qualifier, op, comparator, filter, timeRange, regionActionResultBuilder,
2814                spaceQuotaEnforcement);
2815          } else {
2816            doAtomicBatchOp(regionActionResultBuilder, region, quota, regionAction.getActionList(),
2817              cellScanner, spaceQuotaEnforcement);
2818            processed = Boolean.TRUE;
2819          }
2820        } catch (IOException e) {
2821          rpcServer.getMetrics().exception(e);
2822          // As it's atomic, we may expect it's a global failure.
2823          regionActionResultBuilder.setException(ResponseConverter.buildException(e));
2824        }
2825      } else {
2826        // doNonAtomicRegionMutation manages the exception internally
2827        if (context != null && closeCallBack == null) {
2828          // An RpcCallBack that creates a list of scanners that needs to perform callBack
2829          // operation on completion of multiGets.
2830          // Set this only once
2831          closeCallBack = new RegionScannersCloseCallBack();
2832          context.setCallBack(closeCallBack);
2833        }
2834        cellsToReturn = doNonAtomicRegionMutation(region, quota, regionAction, cellScanner,
2835            regionActionResultBuilder, cellsToReturn, nonceGroup, closeCallBack, context,
2836            spaceQuotaEnforcement);
2837      }
2838      responseBuilder.addRegionActionResult(regionActionResultBuilder.build());
2839      quota.close();
2840      ClientProtos.RegionLoadStats regionLoadStats = region.getLoadStatistics();
2841      if(regionLoadStats != null) {
2842        regionStats.put(regionSpecifier, regionLoadStats);
2843      }
2844    }
2845    // Load the controller with the Cells to return.
2846    if (cellsToReturn != null && !cellsToReturn.isEmpty() && controller != null) {
2847      controller.setCellScanner(CellUtil.createCellScanner(cellsToReturn));
2848    }
2849
2850    if (processed != null) {
2851      responseBuilder.setProcessed(processed);
2852    }
2853
2854    MultiRegionLoadStats.Builder builder = MultiRegionLoadStats.newBuilder();
2855    for(Entry<RegionSpecifier, ClientProtos.RegionLoadStats> stat: regionStats.entrySet()){
2856      builder.addRegion(stat.getKey());
2857      builder.addStat(stat.getValue());
2858    }
2859    responseBuilder.setRegionStatistics(builder);
2860    return responseBuilder.build();
2861  }
2862
2863  private void skipCellsForMutations(List<Action> actions, CellScanner cellScanner) {
2864    if (cellScanner == null) {
2865      return;
2866    }
2867    for (Action action : actions) {
2868      skipCellsForMutation(action, cellScanner);
2869    }
2870  }
2871
2872  private void skipCellsForMutation(Action action, CellScanner cellScanner) {
2873    if (cellScanner == null) {
2874      return;
2875    }
2876    try {
2877      if (action.hasMutation()) {
2878        MutationProto m = action.getMutation();
2879        if (m.hasAssociatedCellCount()) {
2880          for (int i = 0; i < m.getAssociatedCellCount(); i++) {
2881            cellScanner.advance();
2882          }
2883        }
2884      }
2885    } catch (IOException e) {
2886      // No need to handle these Individual Muatation level issue. Any way this entire RegionAction
2887      // marked as failed as we could not see the Region here. At client side the top level
2888      // RegionAction exception will be considered first.
2889      LOG.error("Error while skipping Cells in CellScanner for invalid Region Mutations", e);
2890    }
2891  }
2892
2893  /**
2894   * Mutate data in a table.
2895   *
2896   * @param rpcc the RPC controller
2897   * @param request the mutate request
2898   */
2899  @Override
2900  public MutateResponse mutate(final RpcController rpcc,
2901      final MutateRequest request) throws ServiceException {
2902    // rpc controller is how we bring in data via the back door;  it is unprotobuf'ed data.
2903    // It is also the conduit via which we pass back data.
2904    HBaseRpcController controller = (HBaseRpcController)rpcc;
2905    CellScanner cellScanner = controller != null ? controller.cellScanner() : null;
2906    OperationQuota quota = null;
2907    RpcCallContext context = RpcServer.getCurrentCall().orElse(null);
2908    ActivePolicyEnforcement spaceQuotaEnforcement = null;
2909    MutationType type = null;
2910    HRegion region = null;
2911    long before = EnvironmentEdgeManager.currentTime();
2912    // Clear scanner so we are not holding on to reference across call.
2913    if (controller != null) {
2914      controller.setCellScanner(null);
2915    }
2916    try {
2917      checkOpen();
2918      requestCount.increment();
2919      rpcMutateRequestCount.increment();
2920      region = getRegion(request.getRegion());
2921      MutateResponse.Builder builder = MutateResponse.newBuilder();
2922      MutationProto mutation = request.getMutation();
2923      if (!region.getRegionInfo().isMetaRegion()) {
2924        regionServer.getMemStoreFlusher().reclaimMemStoreMemory();
2925      }
2926      long nonceGroup = request.hasNonceGroup() ? request.getNonceGroup() : HConstants.NO_NONCE;
2927      Result r = null;
2928      Boolean processed = null;
2929      type = mutation.getMutateType();
2930
2931      quota = getRpcQuotaManager().checkQuota(region, OperationQuota.OperationType.MUTATE);
2932      spaceQuotaEnforcement = getSpaceQuotaManager().getActiveEnforcements();
2933
2934      switch (type) {
2935        case APPEND:
2936          // TODO: this doesn't actually check anything.
2937          r = append(region, quota, mutation, cellScanner, nonceGroup, spaceQuotaEnforcement);
2938          break;
2939        case INCREMENT:
2940          // TODO: this doesn't actually check anything.
2941          r = increment(region, quota, mutation, cellScanner, nonceGroup, spaceQuotaEnforcement);
2942          break;
2943        case PUT:
2944          Put put = ProtobufUtil.toPut(mutation, cellScanner);
2945          checkCellSizeLimit(region, put);
2946          // Throws an exception when violated
2947          spaceQuotaEnforcement.getPolicyEnforcement(region).check(put);
2948          quota.addMutation(put);
2949          if (request.hasCondition()) {
2950            Condition condition = request.getCondition();
2951            byte[] row = condition.getRow().toByteArray();
2952            byte[] family = condition.hasFamily() ? condition.getFamily().toByteArray() : null;
2953            byte[] qualifier = condition.hasQualifier() ?
2954              condition.getQualifier().toByteArray() : null;
2955            CompareOperator op = condition.hasCompareType() ?
2956              CompareOperator.valueOf(condition.getCompareType().name()) : null;
2957            ByteArrayComparable comparator = condition.hasComparator() ?
2958              ProtobufUtil.toComparator(condition.getComparator()) : null;
2959            Filter filter = condition.hasFilter() ?
2960              ProtobufUtil.toFilter(condition.getFilter()) : null;
2961            TimeRange timeRange = condition.hasTimeRange() ?
2962              ProtobufUtil.toTimeRange(condition.getTimeRange()) :
2963              TimeRange.allTime();
2964            if (region.getCoprocessorHost() != null) {
2965              if (filter != null) {
2966                processed = region.getCoprocessorHost().preCheckAndPut(row, filter, put);
2967              } else {
2968                processed = region.getCoprocessorHost()
2969                  .preCheckAndPut(row, family, qualifier, op, comparator, put);
2970              }
2971            }
2972            if (processed == null) {
2973              boolean result;
2974              if (filter != null) {
2975                result = region.checkAndMutate(row, filter, timeRange, put);
2976              } else {
2977                result = region.checkAndMutate(row, family, qualifier, op, comparator, timeRange,
2978                  put);
2979              }
2980              if (region.getCoprocessorHost() != null) {
2981                if (filter != null) {
2982                  result = region.getCoprocessorHost().postCheckAndPut(row, filter, put, result);
2983                } else {
2984                  result = region.getCoprocessorHost()
2985                    .postCheckAndPut(row, family, qualifier, op, comparator, put, result);
2986                }
2987              }
2988              processed = result;
2989            }
2990          } else {
2991            region.put(put);
2992            processed = Boolean.TRUE;
2993          }
2994          break;
2995        case DELETE:
2996          Delete delete = ProtobufUtil.toDelete(mutation, cellScanner);
2997          checkCellSizeLimit(region, delete);
2998          spaceQuotaEnforcement.getPolicyEnforcement(region).check(delete);
2999          quota.addMutation(delete);
3000          if (request.hasCondition()) {
3001            Condition condition = request.getCondition();
3002            byte[] row = condition.getRow().toByteArray();
3003            byte[] family = condition.hasFamily() ? condition.getFamily().toByteArray() : null;
3004            byte[] qualifier = condition.hasQualifier() ?
3005              condition.getQualifier().toByteArray() : null;
3006            CompareOperator op = condition.hasCompareType() ?
3007              CompareOperator.valueOf(condition.getCompareType().name()) : null;
3008            ByteArrayComparable comparator = condition.hasComparator() ?
3009              ProtobufUtil.toComparator(condition.getComparator()) : null;
3010            Filter filter = condition.hasFilter() ?
3011              ProtobufUtil.toFilter(condition.getFilter()) : null;
3012            TimeRange timeRange = condition.hasTimeRange() ?
3013              ProtobufUtil.toTimeRange(condition.getTimeRange()) :
3014              TimeRange.allTime();
3015            if (region.getCoprocessorHost() != null) {
3016              if (filter != null) {
3017                processed = region.getCoprocessorHost().preCheckAndDelete(row, filter, delete);
3018              } else {
3019                processed = region.getCoprocessorHost()
3020                  .preCheckAndDelete(row, family, qualifier, op, comparator, delete);
3021              }
3022            }
3023            if (processed == null) {
3024              boolean result;
3025              if (filter != null) {
3026                result = region.checkAndMutate(row, filter, timeRange, delete);
3027              } else {
3028                result = region.checkAndMutate(row, family, qualifier, op, comparator, timeRange,
3029                  delete);
3030              }
3031              if (region.getCoprocessorHost() != null) {
3032                if (filter != null) {
3033                  result = region.getCoprocessorHost().postCheckAndDelete(row, filter, delete,
3034                    result);
3035                } else {
3036                  result = region.getCoprocessorHost()
3037                    .postCheckAndDelete(row, family, qualifier, op, comparator, delete, result);
3038                }
3039              }
3040              processed = result;
3041            }
3042          } else {
3043            region.delete(delete);
3044            processed = Boolean.TRUE;
3045          }
3046          break;
3047        default:
3048          throw new DoNotRetryIOException("Unsupported mutate type: " + type.name());
3049      }
3050      if (processed != null) {
3051        builder.setProcessed(processed.booleanValue());
3052      }
3053      boolean clientCellBlockSupported = isClientCellBlockSupport(context);
3054      addResult(builder, r, controller, clientCellBlockSupported);
3055      if (clientCellBlockSupported) {
3056        addSize(context, r, null);
3057      }
3058      return builder.build();
3059    } catch (IOException ie) {
3060      regionServer.checkFileSystem();
3061      throw new ServiceException(ie);
3062    } finally {
3063      if (quota != null) {
3064        quota.close();
3065      }
3066      // Update metrics
3067      final MetricsRegionServer metricsRegionServer = regionServer.getMetrics();
3068      if (metricsRegionServer != null && type != null) {
3069        long after = EnvironmentEdgeManager.currentTime();
3070        switch (type) {
3071        case DELETE:
3072          if (request.hasCondition()) {
3073            metricsRegionServer.updateCheckAndDelete(after - before);
3074          } else {
3075            metricsRegionServer.updateDelete(
3076                region == null ? null : region.getRegionInfo().getTable(), after - before);
3077          }
3078          break;
3079        case PUT:
3080          if (request.hasCondition()) {
3081            metricsRegionServer.updateCheckAndPut(after - before);
3082          } else {
3083            metricsRegionServer.updatePut(
3084                region == null ? null : region.getRegionInfo().getTable(),after - before);
3085          }
3086          break;
3087        default:
3088          break;
3089        }
3090      }
3091    }
3092  }
3093
3094  // This is used to keep compatible with the old client implementation. Consider remove it if we
3095  // decide to drop the support of the client that still sends close request to a region scanner
3096  // which has already been exhausted.
3097  @Deprecated
3098  private static final IOException SCANNER_ALREADY_CLOSED = new IOException() {
3099
3100    private static final long serialVersionUID = -4305297078988180130L;
3101
3102    @Override
3103    public synchronized Throwable fillInStackTrace() {
3104      return this;
3105    }
3106  };
3107
3108  private RegionScannerHolder getRegionScanner(ScanRequest request) throws IOException {
3109    String scannerName = Long.toString(request.getScannerId());
3110    RegionScannerHolder rsh = scanners.get(scannerName);
3111    if (rsh == null) {
3112      // just ignore the next or close request if scanner does not exists.
3113      if (closedScanners.getIfPresent(scannerName) != null) {
3114        throw SCANNER_ALREADY_CLOSED;
3115      } else {
3116        LOG.warn("Client tried to access missing scanner " + scannerName);
3117        throw new UnknownScannerException(
3118            "Unknown scanner '" + scannerName + "'. This can happen due to any of the following " +
3119                "reasons: a) Scanner id given is wrong, b) Scanner lease expired because of " +
3120                "long wait between consecutive client checkins, c) Server may be closing down, " +
3121                "d) RegionServer restart during upgrade.\nIf the issue is due to reason (b), a " +
3122                "possible fix would be increasing the value of" +
3123                "'hbase.client.scanner.timeout.period' configuration.");
3124      }
3125    }
3126    RegionInfo hri = rsh.s.getRegionInfo();
3127    // Yes, should be the same instance
3128    if (regionServer.getOnlineRegion(hri.getRegionName()) != rsh.r) {
3129      String msg = "Region has changed on the scanner " + scannerName + ": regionName="
3130          + hri.getRegionNameAsString() + ", scannerRegionName=" + rsh.r;
3131      LOG.warn(msg + ", closing...");
3132      scanners.remove(scannerName);
3133      try {
3134        rsh.s.close();
3135      } catch (IOException e) {
3136        LOG.warn("Getting exception closing " + scannerName, e);
3137      } finally {
3138        try {
3139          regionServer.getLeaseManager().cancelLease(scannerName);
3140        } catch (LeaseException e) {
3141          LOG.warn("Getting exception closing " + scannerName, e);
3142        }
3143      }
3144      throw new NotServingRegionException(msg);
3145    }
3146    return rsh;
3147  }
3148
3149  private RegionScannerHolder newRegionScanner(ScanRequest request, ScanResponse.Builder builder)
3150      throws IOException {
3151    HRegion region = getRegion(request.getRegion());
3152    ClientProtos.Scan protoScan = request.getScan();
3153    boolean isLoadingCfsOnDemandSet = protoScan.hasLoadColumnFamiliesOnDemand();
3154    Scan scan = ProtobufUtil.toScan(protoScan);
3155    // if the request doesn't set this, get the default region setting.
3156    if (!isLoadingCfsOnDemandSet) {
3157      scan.setLoadColumnFamiliesOnDemand(region.isLoadingCfsOnDemandDefault());
3158    }
3159
3160    if (!scan.hasFamilies()) {
3161      // Adding all families to scanner
3162      for (byte[] family : region.getTableDescriptor().getColumnFamilyNames()) {
3163        scan.addFamily(family);
3164      }
3165    }
3166    if (region.getCoprocessorHost() != null) {
3167      // preScannerOpen is not allowed to return a RegionScanner. Only post hook can create a
3168      // wrapper for the core created RegionScanner
3169      region.getCoprocessorHost().preScannerOpen(scan);
3170    }
3171    RegionScannerImpl coreScanner = region.getScanner(scan);
3172    Shipper shipper = coreScanner;
3173    RegionScanner scanner = coreScanner;
3174    if (region.getCoprocessorHost() != null) {
3175      scanner = region.getCoprocessorHost().postScannerOpen(scan, scanner);
3176    }
3177    long scannerId = scannerIdGenerator.generateNewScannerId();
3178    builder.setScannerId(scannerId);
3179    builder.setMvccReadPoint(scanner.getMvccReadPoint());
3180    builder.setTtl(scannerLeaseTimeoutPeriod);
3181    String scannerName = String.valueOf(scannerId);
3182    return addScanner(scannerName, scanner, shipper, region, scan.isNeedCursorResult());
3183  }
3184
3185  private void checkScanNextCallSeq(ScanRequest request, RegionScannerHolder rsh)
3186      throws OutOfOrderScannerNextException {
3187    // if nextCallSeq does not match throw Exception straight away. This needs to be
3188    // performed even before checking of Lease.
3189    // See HBASE-5974
3190    if (request.hasNextCallSeq()) {
3191      long callSeq = request.getNextCallSeq();
3192      if (!rsh.incNextCallSeq(callSeq)) {
3193        throw new OutOfOrderScannerNextException("Expected nextCallSeq: " + rsh.getNextCallSeq()
3194            + " But the nextCallSeq got from client: " + request.getNextCallSeq() + "; request="
3195            + TextFormat.shortDebugString(request));
3196      }
3197    }
3198  }
3199
3200  private void addScannerLeaseBack(LeaseManager.Lease lease) {
3201    try {
3202      regionServer.getLeaseManager().addLease(lease);
3203    } catch (LeaseStillHeldException e) {
3204      // should not happen as the scanner id is unique.
3205      throw new AssertionError(e);
3206    }
3207  }
3208
3209  private long getTimeLimit(HBaseRpcController controller, boolean allowHeartbeatMessages) {
3210    // Set the time limit to be half of the more restrictive timeout value (one of the
3211    // timeout values must be positive). In the event that both values are positive, the
3212    // more restrictive of the two is used to calculate the limit.
3213    if (allowHeartbeatMessages && (scannerLeaseTimeoutPeriod > 0 || rpcTimeout > 0)) {
3214      long timeLimitDelta;
3215      if (scannerLeaseTimeoutPeriod > 0 && rpcTimeout > 0) {
3216        timeLimitDelta = Math.min(scannerLeaseTimeoutPeriod, rpcTimeout);
3217      } else {
3218        timeLimitDelta = scannerLeaseTimeoutPeriod > 0 ? scannerLeaseTimeoutPeriod : rpcTimeout;
3219      }
3220      if (controller != null && controller.getCallTimeout() > 0) {
3221        timeLimitDelta = Math.min(timeLimitDelta, controller.getCallTimeout());
3222      }
3223      // Use half of whichever timeout value was more restrictive... But don't allow
3224      // the time limit to be less than the allowable minimum (could cause an
3225      // immediatate timeout before scanning any data).
3226      timeLimitDelta = Math.max(timeLimitDelta / 2, minimumScanTimeLimitDelta);
3227      // XXX: Can not use EnvironmentEdge here because TestIncrementTimeRange use a
3228      // ManualEnvironmentEdge. Consider using System.nanoTime instead.
3229      return System.currentTimeMillis() + timeLimitDelta;
3230    }
3231    // Default value of timeLimit is negative to indicate no timeLimit should be
3232    // enforced.
3233    return -1L;
3234  }
3235
3236  private void checkLimitOfRows(int numOfCompleteRows, int limitOfRows, boolean moreRows,
3237      ScannerContext scannerContext, ScanResponse.Builder builder) {
3238    if (numOfCompleteRows >= limitOfRows) {
3239      if (LOG.isTraceEnabled()) {
3240        LOG.trace("Done scanning, limit of rows reached, moreRows: " + moreRows +
3241            " scannerContext: " + scannerContext);
3242      }
3243      builder.setMoreResults(false);
3244    }
3245  }
3246
3247  // return whether we have more results in region.
3248  private void scan(HBaseRpcController controller, ScanRequest request, RegionScannerHolder rsh,
3249      long maxQuotaResultSize, int maxResults, int limitOfRows, List<Result> results,
3250      ScanResponse.Builder builder, MutableObject<Object> lastBlock, RpcCallContext context)
3251      throws IOException {
3252    HRegion region = rsh.r;
3253    RegionScanner scanner = rsh.s;
3254    long maxResultSize;
3255    if (scanner.getMaxResultSize() > 0) {
3256      maxResultSize = Math.min(scanner.getMaxResultSize(), maxQuotaResultSize);
3257    } else {
3258      maxResultSize = maxQuotaResultSize;
3259    }
3260    // This is cells inside a row. Default size is 10 so if many versions or many cfs,
3261    // then we'll resize. Resizings show in profiler. Set it higher than 10. For now
3262    // arbitrary 32. TODO: keep record of general size of results being returned.
3263    List<Cell> values = new ArrayList<>(32);
3264    region.startRegionOperation(Operation.SCAN);
3265    try {
3266      int numOfResults = 0;
3267      int numOfCompleteRows = 0;
3268      long before = EnvironmentEdgeManager.currentTime();
3269      synchronized (scanner) {
3270        boolean stale = (region.getRegionInfo().getReplicaId() != 0);
3271        boolean clientHandlesPartials =
3272            request.hasClientHandlesPartials() && request.getClientHandlesPartials();
3273        boolean clientHandlesHeartbeats =
3274            request.hasClientHandlesHeartbeats() && request.getClientHandlesHeartbeats();
3275
3276        // On the server side we must ensure that the correct ordering of partial results is
3277        // returned to the client to allow them to properly reconstruct the partial results.
3278        // If the coprocessor host is adding to the result list, we cannot guarantee the
3279        // correct ordering of partial results and so we prevent partial results from being
3280        // formed.
3281        boolean serverGuaranteesOrderOfPartials = results.isEmpty();
3282        boolean allowPartialResults = clientHandlesPartials && serverGuaranteesOrderOfPartials;
3283        boolean moreRows = false;
3284
3285        // Heartbeat messages occur when the processing of the ScanRequest is exceeds a
3286        // certain time threshold on the server. When the time threshold is exceeded, the
3287        // server stops the scan and sends back whatever Results it has accumulated within
3288        // that time period (may be empty). Since heartbeat messages have the potential to
3289        // create partial Results (in the event that the timeout occurs in the middle of a
3290        // row), we must only generate heartbeat messages when the client can handle both
3291        // heartbeats AND partials
3292        boolean allowHeartbeatMessages = clientHandlesHeartbeats && allowPartialResults;
3293
3294        long timeLimit = getTimeLimit(controller, allowHeartbeatMessages);
3295
3296        final LimitScope sizeScope =
3297            allowPartialResults ? LimitScope.BETWEEN_CELLS : LimitScope.BETWEEN_ROWS;
3298        final LimitScope timeScope =
3299            allowHeartbeatMessages ? LimitScope.BETWEEN_CELLS : LimitScope.BETWEEN_ROWS;
3300
3301        boolean trackMetrics = request.hasTrackScanMetrics() && request.getTrackScanMetrics();
3302
3303        // Configure with limits for this RPC. Set keep progress true since size progress
3304        // towards size limit should be kept between calls to nextRaw
3305        ScannerContext.Builder contextBuilder = ScannerContext.newBuilder(true);
3306        // maxResultSize - either we can reach this much size for all cells(being read) data or sum
3307        // of heap size occupied by cells(being read). Cell data means its key and value parts.
3308        contextBuilder.setSizeLimit(sizeScope, maxResultSize, maxResultSize);
3309        contextBuilder.setBatchLimit(scanner.getBatch());
3310        contextBuilder.setTimeLimit(timeScope, timeLimit);
3311        contextBuilder.setTrackMetrics(trackMetrics);
3312        ScannerContext scannerContext = contextBuilder.build();
3313        boolean limitReached = false;
3314        while (numOfResults < maxResults) {
3315          // Reset the batch progress to 0 before every call to RegionScanner#nextRaw. The
3316          // batch limit is a limit on the number of cells per Result. Thus, if progress is
3317          // being tracked (i.e. scannerContext.keepProgress() is true) then we need to
3318          // reset the batch progress between nextRaw invocations since we don't want the
3319          // batch progress from previous calls to affect future calls
3320          scannerContext.setBatchProgress(0);
3321
3322          // Collect values to be returned here
3323          moreRows = scanner.nextRaw(values, scannerContext);
3324
3325          if (!values.isEmpty()) {
3326            if (limitOfRows > 0) {
3327              // First we need to check if the last result is partial and we have a row change. If
3328              // so then we need to increase the numOfCompleteRows.
3329              if (results.isEmpty()) {
3330                if (rsh.rowOfLastPartialResult != null &&
3331                    !CellUtil.matchingRows(values.get(0), rsh.rowOfLastPartialResult)) {
3332                  numOfCompleteRows++;
3333                  checkLimitOfRows(numOfCompleteRows, limitOfRows, moreRows, scannerContext,
3334                    builder);
3335                }
3336              } else {
3337                Result lastResult = results.get(results.size() - 1);
3338                if (lastResult.mayHaveMoreCellsInRow() &&
3339                    !CellUtil.matchingRows(values.get(0), lastResult.getRow())) {
3340                  numOfCompleteRows++;
3341                  checkLimitOfRows(numOfCompleteRows, limitOfRows, moreRows, scannerContext,
3342                    builder);
3343                }
3344              }
3345              if (builder.hasMoreResults() && !builder.getMoreResults()) {
3346                break;
3347              }
3348            }
3349            boolean mayHaveMoreCellsInRow = scannerContext.mayHaveMoreCellsInRow();
3350            Result r = Result.create(values, null, stale, mayHaveMoreCellsInRow);
3351            lastBlock.setValue(addSize(context, r, lastBlock.getValue()));
3352            results.add(r);
3353            numOfResults++;
3354            if (!mayHaveMoreCellsInRow && limitOfRows > 0) {
3355              numOfCompleteRows++;
3356              checkLimitOfRows(numOfCompleteRows, limitOfRows, moreRows, scannerContext, builder);
3357              if (builder.hasMoreResults() && !builder.getMoreResults()) {
3358                break;
3359              }
3360            }
3361          } else if (!moreRows && !results.isEmpty()) {
3362            // No more cells for the scan here, we need to ensure that the mayHaveMoreCellsInRow of
3363            // last result is false. Otherwise it's possible that: the first nextRaw returned
3364            // because BATCH_LIMIT_REACHED (BTW it happen to exhaust all cells of the scan),so the
3365            // last result's mayHaveMoreCellsInRow will be true. while the following nextRaw will
3366            // return with moreRows=false, which means moreResultsInRegion would be false, it will
3367            // be a contradictory state (HBASE-21206).
3368            int lastIdx = results.size() - 1;
3369            Result r = results.get(lastIdx);
3370            if (r.mayHaveMoreCellsInRow()) {
3371              results.set(lastIdx, Result.create(r.rawCells(), r.getExists(), r.isStale(), false));
3372            }
3373          }
3374          boolean sizeLimitReached = scannerContext.checkSizeLimit(LimitScope.BETWEEN_ROWS);
3375          boolean timeLimitReached = scannerContext.checkTimeLimit(LimitScope.BETWEEN_ROWS);
3376          boolean resultsLimitReached = numOfResults >= maxResults;
3377          limitReached = sizeLimitReached || timeLimitReached || resultsLimitReached;
3378
3379          if (limitReached || !moreRows) {
3380            // We only want to mark a ScanResponse as a heartbeat message in the event that
3381            // there are more values to be read server side. If there aren't more values,
3382            // marking it as a heartbeat is wasteful because the client will need to issue
3383            // another ScanRequest only to realize that they already have all the values
3384            if (moreRows && timeLimitReached) {
3385              // Heartbeat messages occur when the time limit has been reached.
3386              builder.setHeartbeatMessage(true);
3387              if (rsh.needCursor) {
3388                Cell cursorCell = scannerContext.getLastPeekedCell();
3389                if (cursorCell != null) {
3390                  builder.setCursor(ProtobufUtil.toCursor(cursorCell));
3391                }
3392              }
3393            }
3394            break;
3395          }
3396          values.clear();
3397        }
3398        builder.setMoreResultsInRegion(moreRows);
3399        // Check to see if the client requested that we track metrics server side. If the
3400        // client requested metrics, retrieve the metrics from the scanner context.
3401        if (trackMetrics) {
3402          Map<String, Long> metrics = scannerContext.getMetrics().getMetricsMap();
3403          ScanMetrics.Builder metricBuilder = ScanMetrics.newBuilder();
3404          NameInt64Pair.Builder pairBuilder = NameInt64Pair.newBuilder();
3405
3406          for (Entry<String, Long> entry : metrics.entrySet()) {
3407            pairBuilder.setName(entry.getKey());
3408            pairBuilder.setValue(entry.getValue());
3409            metricBuilder.addMetrics(pairBuilder.build());
3410          }
3411
3412          builder.setScanMetrics(metricBuilder.build());
3413        }
3414      }
3415      long end = EnvironmentEdgeManager.currentTime();
3416      long responseCellSize = context != null ? context.getResponseCellSize() : 0;
3417      region.getMetrics().updateScanTime(end - before);
3418      final MetricsRegionServer metricsRegionServer = regionServer.getMetrics();
3419      if (metricsRegionServer != null) {
3420        metricsRegionServer.updateScanSize(
3421            region.getTableDescriptor().getTableName(), responseCellSize);
3422        metricsRegionServer.updateScanTime(
3423            region.getTableDescriptor().getTableName(), end - before);
3424      }
3425    } finally {
3426      region.closeRegionOperation();
3427    }
3428    // coprocessor postNext hook
3429    if (region.getCoprocessorHost() != null) {
3430      region.getCoprocessorHost().postScannerNext(scanner, results, maxResults, true);
3431    }
3432  }
3433
3434  /**
3435   * Scan data in a table.
3436   *
3437   * @param controller the RPC controller
3438   * @param request the scan request
3439   * @throws ServiceException
3440   */
3441  @Override
3442  public ScanResponse scan(final RpcController controller, final ScanRequest request)
3443      throws ServiceException {
3444    if (controller != null && !(controller instanceof HBaseRpcController)) {
3445      throw new UnsupportedOperationException(
3446          "We only do " + "HBaseRpcControllers! FIX IF A PROBLEM: " + controller);
3447    }
3448    if (!request.hasScannerId() && !request.hasScan()) {
3449      throw new ServiceException(
3450          new DoNotRetryIOException("Missing required input: scannerId or scan"));
3451    }
3452    try {
3453      checkOpen();
3454    } catch (IOException e) {
3455      if (request.hasScannerId()) {
3456        String scannerName = Long.toString(request.getScannerId());
3457        if (LOG.isDebugEnabled()) {
3458          LOG.debug(
3459            "Server shutting down and client tried to access missing scanner " + scannerName);
3460        }
3461        final LeaseManager leaseManager = regionServer.getLeaseManager();
3462        if (leaseManager != null) {
3463          try {
3464            leaseManager.cancelLease(scannerName);
3465          } catch (LeaseException le) {
3466            // No problem, ignore
3467            if (LOG.isTraceEnabled()) {
3468              LOG.trace("Un-able to cancel lease of scanner. It could already be closed.");
3469            }
3470          }
3471        }
3472      }
3473      throw new ServiceException(e);
3474    }
3475    requestCount.increment();
3476    rpcScanRequestCount.increment();
3477    RegionScannerHolder rsh;
3478    ScanResponse.Builder builder = ScanResponse.newBuilder();
3479    try {
3480      if (request.hasScannerId()) {
3481        // The downstream projects such as AsyncHBase in OpenTSDB need this value. See HBASE-18000
3482        // for more details.
3483        builder.setScannerId(request.getScannerId());
3484        rsh = getRegionScanner(request);
3485      } else {
3486        rsh = newRegionScanner(request, builder);
3487      }
3488    } catch (IOException e) {
3489      if (e == SCANNER_ALREADY_CLOSED) {
3490        // Now we will close scanner automatically if there are no more results for this region but
3491        // the old client will still send a close request to us. Just ignore it and return.
3492        return builder.build();
3493      }
3494      throw new ServiceException(e);
3495    }
3496    HRegion region = rsh.r;
3497    String scannerName = rsh.scannerName;
3498    LeaseManager.Lease lease;
3499    try {
3500      // Remove lease while its being processed in server; protects against case
3501      // where processing of request takes > lease expiration time.
3502      lease = regionServer.getLeaseManager().removeLease(scannerName);
3503    } catch (LeaseException e) {
3504      throw new ServiceException(e);
3505    }
3506    if (request.hasRenew() && request.getRenew()) {
3507      // add back and return
3508      addScannerLeaseBack(lease);
3509      try {
3510        checkScanNextCallSeq(request, rsh);
3511      } catch (OutOfOrderScannerNextException e) {
3512        throw new ServiceException(e);
3513      }
3514      return builder.build();
3515    }
3516    OperationQuota quota;
3517    try {
3518      quota = getRpcQuotaManager().checkQuota(region, OperationQuota.OperationType.SCAN);
3519    } catch (IOException e) {
3520      addScannerLeaseBack(lease);
3521      throw new ServiceException(e);
3522    }
3523    try {
3524      checkScanNextCallSeq(request, rsh);
3525    } catch (OutOfOrderScannerNextException e) {
3526      addScannerLeaseBack(lease);
3527      throw new ServiceException(e);
3528    }
3529    // Now we have increased the next call sequence. If we give client an error, the retry will
3530    // never success. So we'd better close the scanner and return a DoNotRetryIOException to client
3531    // and then client will try to open a new scanner.
3532    boolean closeScanner = request.hasCloseScanner() ? request.getCloseScanner() : false;
3533    int rows; // this is scan.getCaching
3534    if (request.hasNumberOfRows()) {
3535      rows = request.getNumberOfRows();
3536    } else {
3537      rows = closeScanner ? 0 : 1;
3538    }
3539    RpcCallContext context = RpcServer.getCurrentCall().orElse(null);
3540    // now let's do the real scan.
3541    long maxQuotaResultSize = Math.min(maxScannerResultSize, quota.getReadAvailable());
3542    RegionScanner scanner = rsh.s;
3543    // this is the limit of rows for this scan, if we the number of rows reach this value, we will
3544    // close the scanner.
3545    int limitOfRows;
3546    if (request.hasLimitOfRows()) {
3547      limitOfRows = request.getLimitOfRows();
3548    } else {
3549      limitOfRows = -1;
3550    }
3551    MutableObject<Object> lastBlock = new MutableObject<>();
3552    boolean scannerClosed = false;
3553    try {
3554      List<Result> results = new ArrayList<>(Math.min(rows, 512));
3555      if (rows > 0) {
3556        boolean done = false;
3557        // Call coprocessor. Get region info from scanner.
3558        if (region.getCoprocessorHost() != null) {
3559          Boolean bypass = region.getCoprocessorHost().preScannerNext(scanner, results, rows);
3560          if (!results.isEmpty()) {
3561            for (Result r : results) {
3562              lastBlock.setValue(addSize(context, r, lastBlock.getValue()));
3563            }
3564          }
3565          if (bypass != null && bypass.booleanValue()) {
3566            done = true;
3567          }
3568        }
3569        if (!done) {
3570          scan((HBaseRpcController) controller, request, rsh, maxQuotaResultSize, rows, limitOfRows,
3571            results, builder, lastBlock, context);
3572        } else {
3573          builder.setMoreResultsInRegion(!results.isEmpty());
3574        }
3575      } else {
3576        // This is a open scanner call with numberOfRow = 0, so set more results in region to true.
3577        builder.setMoreResultsInRegion(true);
3578      }
3579
3580      quota.addScanResult(results);
3581      addResults(builder, results, (HBaseRpcController) controller,
3582        RegionReplicaUtil.isDefaultReplica(region.getRegionInfo()),
3583        isClientCellBlockSupport(context));
3584      if (scanner.isFilterDone() && results.isEmpty()) {
3585        // If the scanner's filter - if any - is done with the scan
3586        // only set moreResults to false if the results is empty. This is used to keep compatible
3587        // with the old scan implementation where we just ignore the returned results if moreResults
3588        // is false. Can remove the isEmpty check after we get rid of the old implementation.
3589        builder.setMoreResults(false);
3590      }
3591      // Later we may close the scanner depending on this flag so here we need to make sure that we
3592      // have already set this flag.
3593      assert builder.hasMoreResultsInRegion();
3594      // we only set moreResults to false in the above code, so set it to true if we haven't set it
3595      // yet.
3596      if (!builder.hasMoreResults()) {
3597        builder.setMoreResults(true);
3598      }
3599      if (builder.getMoreResults() && builder.getMoreResultsInRegion() && !results.isEmpty()) {
3600        // Record the last cell of the last result if it is a partial result
3601        // We need this to calculate the complete rows we have returned to client as the
3602        // mayHaveMoreCellsInRow is true does not mean that there will be extra cells for the
3603        // current row. We may filter out all the remaining cells for the current row and just
3604        // return the cells of the nextRow when calling RegionScanner.nextRaw. So here we need to
3605        // check for row change.
3606        Result lastResult = results.get(results.size() - 1);
3607        if (lastResult.mayHaveMoreCellsInRow()) {
3608          rsh.rowOfLastPartialResult = lastResult.getRow();
3609        } else {
3610          rsh.rowOfLastPartialResult = null;
3611        }
3612      }
3613      if (!builder.getMoreResults() || !builder.getMoreResultsInRegion() || closeScanner) {
3614        scannerClosed = true;
3615        closeScanner(region, scanner, scannerName, context);
3616      }
3617      return builder.build();
3618    } catch (IOException e) {
3619      try {
3620        // scanner is closed here
3621        scannerClosed = true;
3622        // The scanner state might be left in a dirty state, so we will tell the Client to
3623        // fail this RPC and close the scanner while opening up another one from the start of
3624        // row that the client has last seen.
3625        closeScanner(region, scanner, scannerName, context);
3626
3627        // If it is a DoNotRetryIOException already, throw as it is. Unfortunately, DNRIOE is
3628        // used in two different semantics.
3629        // (1) The first is to close the client scanner and bubble up the exception all the way
3630        // to the application. This is preferred when the exception is really un-recoverable
3631        // (like CorruptHFileException, etc). Plain DoNotRetryIOException also falls into this
3632        // bucket usually.
3633        // (2) Second semantics is to close the current region scanner only, but continue the
3634        // client scanner by overriding the exception. This is usually UnknownScannerException,
3635        // OutOfOrderScannerNextException, etc where the region scanner has to be closed, but the
3636        // application-level ClientScanner has to continue without bubbling up the exception to
3637        // the client. See ClientScanner code to see how it deals with these special exceptions.
3638        if (e instanceof DoNotRetryIOException) {
3639          throw e;
3640        }
3641
3642        // If it is a FileNotFoundException, wrap as a
3643        // DoNotRetryIOException. This can avoid the retry in ClientScanner.
3644        if (e instanceof FileNotFoundException) {
3645          throw new DoNotRetryIOException(e);
3646        }
3647
3648        // We closed the scanner already. Instead of throwing the IOException, and client
3649        // retrying with the same scannerId only to get USE on the next RPC, we directly throw
3650        // a special exception to save an RPC.
3651        if (VersionInfoUtil.hasMinimumVersion(context.getClientVersionInfo(), 1, 4)) {
3652          // 1.4.0+ clients know how to handle
3653          throw new ScannerResetException("Scanner is closed on the server-side", e);
3654        } else {
3655          // older clients do not know about SRE. Just throw USE, which they will handle
3656          throw new UnknownScannerException("Throwing UnknownScannerException to reset the client"
3657              + " scanner state for clients older than 1.3.", e);
3658        }
3659      } catch (IOException ioe) {
3660        throw new ServiceException(ioe);
3661      }
3662    } finally {
3663      if (!scannerClosed) {
3664        // Adding resets expiration time on lease.
3665        // the closeCallBack will be set in closeScanner so here we only care about shippedCallback
3666        if (context != null) {
3667          context.setCallBack(rsh.shippedCallback);
3668        } else {
3669          // When context != null, adding back the lease will be done in callback set above.
3670          addScannerLeaseBack(lease);
3671        }
3672      }
3673      quota.close();
3674    }
3675  }
3676
3677  private void closeScanner(HRegion region, RegionScanner scanner, String scannerName,
3678      RpcCallContext context) throws IOException {
3679    if (region.getCoprocessorHost() != null) {
3680      if (region.getCoprocessorHost().preScannerClose(scanner)) {
3681        // bypass the actual close.
3682        return;
3683      }
3684    }
3685    RegionScannerHolder rsh = scanners.remove(scannerName);
3686    if (rsh != null) {
3687      if (context != null) {
3688        context.setCallBack(rsh.closeCallBack);
3689      } else {
3690        rsh.s.close();
3691      }
3692      if (region.getCoprocessorHost() != null) {
3693        region.getCoprocessorHost().postScannerClose(scanner);
3694      }
3695      closedScanners.put(scannerName, scannerName);
3696    }
3697  }
3698
3699  @Override
3700  public CoprocessorServiceResponse execRegionServerService(RpcController controller,
3701      CoprocessorServiceRequest request) throws ServiceException {
3702    rpcPreCheck("execRegionServerService");
3703    return regionServer.execRegionServerService(controller, request);
3704  }
3705
3706  @Override
3707  public UpdateConfigurationResponse updateConfiguration(
3708      RpcController controller, UpdateConfigurationRequest request)
3709      throws ServiceException {
3710    try {
3711      this.regionServer.updateConfiguration();
3712    } catch (Exception e) {
3713      throw new ServiceException(e);
3714    }
3715    return UpdateConfigurationResponse.getDefaultInstance();
3716  }
3717
3718  @Override
3719  public GetSpaceQuotaSnapshotsResponse getSpaceQuotaSnapshots(
3720      RpcController controller, GetSpaceQuotaSnapshotsRequest request) throws ServiceException {
3721    try {
3722      final RegionServerSpaceQuotaManager manager =
3723          regionServer.getRegionServerSpaceQuotaManager();
3724      final GetSpaceQuotaSnapshotsResponse.Builder builder =
3725          GetSpaceQuotaSnapshotsResponse.newBuilder();
3726      if (manager != null) {
3727        final Map<TableName,SpaceQuotaSnapshot> snapshots = manager.copyQuotaSnapshots();
3728        for (Entry<TableName,SpaceQuotaSnapshot> snapshot : snapshots.entrySet()) {
3729          builder.addSnapshots(TableQuotaSnapshot.newBuilder()
3730              .setTableName(ProtobufUtil.toProtoTableName(snapshot.getKey()))
3731              .setSnapshot(SpaceQuotaSnapshot.toProtoSnapshot(snapshot.getValue()))
3732              .build());
3733        }
3734      }
3735      return builder.build();
3736    } catch (Exception e) {
3737      throw new ServiceException(e);
3738    }
3739  }
3740
3741  @Override
3742  public ClearRegionBlockCacheResponse clearRegionBlockCache(RpcController controller,
3743      ClearRegionBlockCacheRequest request) {
3744    ClearRegionBlockCacheResponse.Builder builder =
3745        ClearRegionBlockCacheResponse.newBuilder();
3746    CacheEvictionStatsBuilder stats = CacheEvictionStats.builder();
3747    List<HRegion> regions = getRegions(request.getRegionList(), stats);
3748    for (HRegion region : regions) {
3749      try {
3750        stats = stats.append(this.regionServer.clearRegionBlockCache(region));
3751      } catch (Exception e) {
3752        stats.addException(region.getRegionInfo().getRegionName(), e);
3753      }
3754    }
3755    stats.withMaxCacheSize(regionServer.getBlockCache().map(BlockCache::getMaxSize).orElse(0L));
3756    return builder.setStats(ProtobufUtil.toCacheEvictionStats(stats.build())).build();
3757  }
3758
3759  private void executeOpenRegionProcedures(OpenRegionRequest request,
3760      Map<TableName, TableDescriptor> tdCache) {
3761    long masterSystemTime = request.hasMasterSystemTime() ? request.getMasterSystemTime() : -1;
3762    for (RegionOpenInfo regionOpenInfo : request.getOpenInfoList()) {
3763      RegionInfo regionInfo = ProtobufUtil.toRegionInfo(regionOpenInfo.getRegion());
3764      TableDescriptor tableDesc = tdCache.get(regionInfo.getTable());
3765      if (tableDesc == null) {
3766        try {
3767          tableDesc = regionServer.getTableDescriptors().get(regionInfo.getTable());
3768        } catch (IOException e) {
3769          // Here we do not fail the whole method since we also need deal with other
3770          // procedures, and we can not ignore this one, so we still schedule a
3771          // AssignRegionHandler and it will report back to master if we still can not get the
3772          // TableDescriptor.
3773          LOG.warn("Failed to get TableDescriptor of {}, will try again in the handler",
3774            regionInfo.getTable(), e);
3775        }
3776      }
3777      if (regionOpenInfo.getFavoredNodesCount() > 0) {
3778        regionServer.updateRegionFavoredNodesMapping(regionInfo.getEncodedName(),
3779          regionOpenInfo.getFavoredNodesList());
3780      }
3781      long procId = regionOpenInfo.getOpenProcId();
3782      if (regionServer.submitRegionProcedure(procId)) {
3783        regionServer.executorService.submit(AssignRegionHandler
3784            .create(regionServer, regionInfo, procId, tableDesc,
3785                masterSystemTime));
3786      }
3787    }
3788  }
3789
3790  private void executeCloseRegionProcedures(CloseRegionRequest request) {
3791    String encodedName;
3792    try {
3793      encodedName = ProtobufUtil.getRegionEncodedName(request.getRegion());
3794    } catch (DoNotRetryIOException e) {
3795      throw new UncheckedIOException("Should not happen", e);
3796    }
3797    ServerName destination = request.hasDestinationServer() ?
3798        ProtobufUtil.toServerName(request.getDestinationServer()) :
3799        null;
3800    long procId = request.getCloseProcId();
3801    if (regionServer.submitRegionProcedure(procId)) {
3802      regionServer.executorService.submit(UnassignRegionHandler
3803          .create(regionServer, encodedName, procId, false, destination));
3804    }
3805  }
3806
3807  private void executeProcedures(RemoteProcedureRequest request) {
3808    RSProcedureCallable callable;
3809    try {
3810      callable = Class.forName(request.getProcClass()).asSubclass(RSProcedureCallable.class)
3811        .getDeclaredConstructor().newInstance();
3812    } catch (Exception e) {
3813      LOG.warn("Failed to instantiating remote procedure {}, pid={}", request.getProcClass(),
3814        request.getProcId(), e);
3815      regionServer.remoteProcedureComplete(request.getProcId(), e);
3816      return;
3817    }
3818    callable.init(request.getProcData().toByteArray(), regionServer);
3819    LOG.debug("Executing remote procedure {}, pid={}", callable.getClass(), request.getProcId());
3820    regionServer.executeProcedure(request.getProcId(), callable);
3821  }
3822
3823  @Override
3824  @QosPriority(priority = HConstants.ADMIN_QOS)
3825  public ExecuteProceduresResponse executeProcedures(RpcController controller,
3826      ExecuteProceduresRequest request) throws ServiceException {
3827    try {
3828      checkOpen();
3829      throwOnWrongStartCode(request);
3830      regionServer.getRegionServerCoprocessorHost().preExecuteProcedures();
3831      if (request.getOpenRegionCount() > 0) {
3832        // Avoid reading from the TableDescritor every time(usually it will read from the file
3833        // system)
3834        Map<TableName, TableDescriptor> tdCache = new HashMap<>();
3835        request.getOpenRegionList().forEach(req -> executeOpenRegionProcedures(req, tdCache));
3836      }
3837      if (request.getCloseRegionCount() > 0) {
3838        request.getCloseRegionList().forEach(this::executeCloseRegionProcedures);
3839      }
3840      if (request.getProcCount() > 0) {
3841        request.getProcList().forEach(this::executeProcedures);
3842      }
3843      regionServer.getRegionServerCoprocessorHost().postExecuteProcedures();
3844      return ExecuteProceduresResponse.getDefaultInstance();
3845    } catch (IOException e) {
3846      throw new ServiceException(e);
3847    }
3848  }
3849
3850  @Override
3851  @QosPriority(priority = HConstants.ADMIN_QOS)
3852  public SlowLogResponses getSlowLogResponses(final RpcController controller,
3853      final SlowLogResponseRequest request) {
3854    final SlowLogRecorder slowLogRecorder =
3855      this.regionServer.getSlowLogRecorder();
3856    final List<SlowLogPayload> slowLogPayloads;
3857    slowLogPayloads = slowLogRecorder != null
3858      ? slowLogRecorder.getSlowLogPayloads(request)
3859      : Collections.emptyList();
3860    SlowLogResponses slowLogResponses = SlowLogResponses.newBuilder()
3861      .addAllSlowLogPayloads(slowLogPayloads)
3862      .build();
3863    return slowLogResponses;
3864  }
3865
3866  @Override
3867  @QosPriority(priority = HConstants.ADMIN_QOS)
3868  public SlowLogResponses getLargeLogResponses(final RpcController controller,
3869      final SlowLogResponseRequest request) {
3870    final SlowLogRecorder slowLogRecorder =
3871      this.regionServer.getSlowLogRecorder();
3872    final List<SlowLogPayload> slowLogPayloads;
3873    slowLogPayloads = slowLogRecorder != null
3874      ? slowLogRecorder.getLargeLogPayloads(request)
3875      : Collections.emptyList();
3876    SlowLogResponses slowLogResponses = SlowLogResponses.newBuilder()
3877      .addAllSlowLogPayloads(slowLogPayloads)
3878      .build();
3879    return slowLogResponses;
3880  }
3881
3882  @Override
3883  @QosPriority(priority = HConstants.ADMIN_QOS)
3884  public ClearSlowLogResponses clearSlowLogsResponses(final RpcController controller,
3885    final ClearSlowLogResponseRequest request) {
3886    final SlowLogRecorder slowLogRecorder =
3887      this.regionServer.getSlowLogRecorder();
3888    boolean slowLogsCleaned = Optional.ofNullable(slowLogRecorder)
3889      .map(SlowLogRecorder::clearSlowLogPayloads).orElse(false);
3890    ClearSlowLogResponses clearSlowLogResponses = ClearSlowLogResponses.newBuilder()
3891      .setIsCleaned(slowLogsCleaned)
3892      .build();
3893    return clearSlowLogResponses;
3894  }
3895
3896  @VisibleForTesting
3897  public RpcScheduler getRpcScheduler() {
3898    return rpcServer.getScheduler();
3899  }
3900
3901  protected AccessChecker getAccessChecker() {
3902    return accessChecker;
3903  }
3904
3905  protected ZKPermissionWatcher getZkPermissionWatcher() {
3906    return zkPermissionWatcher;
3907  }
3908}