001/*
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019package org.apache.hadoop.hbase.regionserver;
020
021import java.io.FileNotFoundException;
022import java.io.IOException;
023import java.io.InterruptedIOException;
024import java.lang.reflect.InvocationTargetException;
025import java.net.BindException;
026import java.net.InetSocketAddress;
027import java.net.UnknownHostException;
028import java.nio.ByteBuffer;
029import java.util.ArrayList;
030import java.util.Arrays;
031import java.util.Collections;
032import java.util.HashMap;
033import java.util.Iterator;
034import java.util.List;
035import java.util.Map;
036import java.util.Map.Entry;
037import java.util.NavigableMap;
038import java.util.Set;
039import java.util.TreeSet;
040import java.util.concurrent.ConcurrentHashMap;
041import java.util.concurrent.ConcurrentMap;
042import java.util.concurrent.TimeUnit;
043import java.util.concurrent.atomic.AtomicBoolean;
044import java.util.concurrent.atomic.AtomicLong;
045import java.util.concurrent.atomic.LongAdder;
046
047import org.apache.commons.lang3.mutable.MutableObject;
048import org.apache.hadoop.conf.Configuration;
049import org.apache.hadoop.fs.Path;
050import org.apache.hadoop.hbase.ByteBufferExtendedCell;
051import org.apache.hadoop.hbase.CacheEvictionStats;
052import org.apache.hadoop.hbase.CacheEvictionStatsBuilder;
053import org.apache.hadoop.hbase.Cell;
054import org.apache.hadoop.hbase.CellScannable;
055import org.apache.hadoop.hbase.CellScanner;
056import org.apache.hadoop.hbase.CellUtil;
057import org.apache.hadoop.hbase.CompareOperator;
058import org.apache.hadoop.hbase.DoNotRetryIOException;
059import org.apache.hadoop.hbase.DroppedSnapshotException;
060import org.apache.hadoop.hbase.HBaseIOException;
061import org.apache.hadoop.hbase.HConstants;
062import org.apache.hadoop.hbase.MultiActionResultTooLarge;
063import org.apache.hadoop.hbase.NotServingRegionException;
064import org.apache.hadoop.hbase.PrivateCellUtil;
065import org.apache.hadoop.hbase.Server;
066import org.apache.hadoop.hbase.ServerName;
067import org.apache.hadoop.hbase.TableName;
068import org.apache.hadoop.hbase.UnknownScannerException;
069import org.apache.hadoop.hbase.client.Append;
070import org.apache.hadoop.hbase.client.ConnectionUtils;
071import org.apache.hadoop.hbase.client.Delete;
072import org.apache.hadoop.hbase.client.Durability;
073import org.apache.hadoop.hbase.client.Get;
074import org.apache.hadoop.hbase.client.Increment;
075import org.apache.hadoop.hbase.client.Mutation;
076import org.apache.hadoop.hbase.client.Put;
077import org.apache.hadoop.hbase.client.RegionInfo;
078import org.apache.hadoop.hbase.client.RegionReplicaUtil;
079import org.apache.hadoop.hbase.client.Result;
080import org.apache.hadoop.hbase.client.Row;
081import org.apache.hadoop.hbase.client.RowMutations;
082import org.apache.hadoop.hbase.client.Scan;
083import org.apache.hadoop.hbase.client.TableDescriptor;
084import org.apache.hadoop.hbase.client.VersionInfoUtil;
085import org.apache.hadoop.hbase.conf.ConfigurationObserver;
086import org.apache.hadoop.hbase.exceptions.FailedSanityCheckException;
087import org.apache.hadoop.hbase.exceptions.OutOfOrderScannerNextException;
088import org.apache.hadoop.hbase.exceptions.ScannerResetException;
089import org.apache.hadoop.hbase.exceptions.UnknownProtocolException;
090import org.apache.hadoop.hbase.filter.ByteArrayComparable;
091import org.apache.hadoop.hbase.io.TimeRange;
092import org.apache.hadoop.hbase.ipc.HBaseRPCErrorHandler;
093import org.apache.hadoop.hbase.ipc.HBaseRpcController;
094import org.apache.hadoop.hbase.ipc.PriorityFunction;
095import org.apache.hadoop.hbase.ipc.QosPriority;
096import org.apache.hadoop.hbase.ipc.RpcCallContext;
097import org.apache.hadoop.hbase.ipc.RpcCallback;
098import org.apache.hadoop.hbase.ipc.RpcServer;
099import org.apache.hadoop.hbase.ipc.RpcServer.BlockingServiceAndInterface;
100import org.apache.hadoop.hbase.ipc.RpcServerFactory;
101import org.apache.hadoop.hbase.ipc.RpcServerInterface;
102import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
103import org.apache.hadoop.hbase.ipc.ServerRpcController;
104import org.apache.hadoop.hbase.log.HBaseMarkers;
105import org.apache.hadoop.hbase.master.MasterRpcServices;
106import org.apache.hadoop.hbase.net.Address;
107import org.apache.hadoop.hbase.quotas.ActivePolicyEnforcement;
108import org.apache.hadoop.hbase.quotas.OperationQuota;
109import org.apache.hadoop.hbase.quotas.QuotaUtil;
110import org.apache.hadoop.hbase.quotas.RegionServerRpcQuotaManager;
111import org.apache.hadoop.hbase.quotas.RegionServerSpaceQuotaManager;
112import org.apache.hadoop.hbase.quotas.SpaceQuotaSnapshot;
113import org.apache.hadoop.hbase.quotas.SpaceViolationPolicyEnforcement;
114import org.apache.hadoop.hbase.regionserver.HRegion.RegionScannerImpl;
115import org.apache.hadoop.hbase.regionserver.Leases.Lease;
116import org.apache.hadoop.hbase.regionserver.Leases.LeaseStillHeldException;
117import org.apache.hadoop.hbase.regionserver.Region.Operation;
118import org.apache.hadoop.hbase.regionserver.ScannerContext.LimitScope;
119import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
120import org.apache.hadoop.hbase.regionserver.handler.OpenMetaHandler;
121import org.apache.hadoop.hbase.regionserver.handler.OpenPriorityRegionHandler;
122import org.apache.hadoop.hbase.regionserver.handler.OpenRegionHandler;
123import org.apache.hadoop.hbase.security.Superusers;
124import org.apache.hadoop.hbase.security.User;
125import org.apache.hadoop.hbase.security.access.AccessChecker;
126import org.apache.hadoop.hbase.security.access.Permission;
127import org.apache.hadoop.hbase.util.Bytes;
128import org.apache.hadoop.hbase.util.CollectionUtils;
129import org.apache.hadoop.hbase.util.DNS;
130import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
131import org.apache.hadoop.hbase.util.Pair;
132import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil;
133import org.apache.hadoop.hbase.util.Strings;
134import org.apache.hadoop.hbase.wal.WAL;
135import org.apache.hadoop.hbase.wal.WALEdit;
136import org.apache.hadoop.hbase.wal.WALKey;
137import org.apache.hadoop.hbase.wal.WALSplitter;
138import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
139import org.apache.yetus.audience.InterfaceAudience;
140import org.slf4j.Logger;
141import org.slf4j.LoggerFactory;
142
143import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
144import org.apache.hbase.thirdparty.com.google.common.cache.Cache;
145import org.apache.hbase.thirdparty.com.google.common.cache.CacheBuilder;
146import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
147import org.apache.hbase.thirdparty.com.google.protobuf.ByteString;
148import org.apache.hbase.thirdparty.com.google.protobuf.Message;
149import org.apache.hbase.thirdparty.com.google.protobuf.RpcController;
150import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
151import org.apache.hbase.thirdparty.com.google.protobuf.TextFormat;
152import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations;
153import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
154import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
155import org.apache.hadoop.hbase.shaded.protobuf.ResponseConverter;
156import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService;
157import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearCompactionQueuesRequest;
158import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearCompactionQueuesResponse;
159import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearRegionBlockCacheRequest;
160import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearRegionBlockCacheResponse;
161import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CloseRegionRequest;
162import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CloseRegionResponse;
163import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactRegionRequest;
164import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactRegionResponse;
165import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ExecuteProceduresRequest;
166import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ExecuteProceduresResponse;
167import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionRequest;
168import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionResponse;
169import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetOnlineRegionRequest;
170import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetOnlineRegionResponse;
171import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoRequest;
172import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoResponse;
173import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionLoadRequest;
174import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionLoadResponse;
175import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetServerInfoRequest;
176import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetServerInfoResponse;
177import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetStoreFileRequest;
178import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetStoreFileResponse;
179import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionRequest;
180import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionRequest.RegionOpenInfo;
181import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionResponse;
182import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionResponse.RegionOpeningState;
183import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ReplicateWALEntryRequest;
184import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ReplicateWALEntryResponse;
185import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.RollWALWriterRequest;
186import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.RollWALWriterResponse;
187import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.StopServerRequest;
188import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.StopServerResponse;
189import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateConfigurationRequest;
190import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateConfigurationResponse;
191import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateFavoredNodesRequest;
192import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateFavoredNodesResponse;
193import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WALEntry;
194import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WarmupRegionRequest;
195import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WarmupRegionResponse;
196import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
197import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.Action;
198import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.BulkLoadHFileRequest;
199import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.BulkLoadHFileRequest.FamilyPath;
200import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.BulkLoadHFileResponse;
201import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CleanupBulkLoadRequest;
202import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CleanupBulkLoadResponse;
203import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService;
204import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.Condition;
205import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceRequest;
206import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceResponse;
207import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.GetRequest;
208import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.GetResponse;
209import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MultiRegionLoadStats;
210import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MultiRequest;
211import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MultiResponse;
212import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateRequest;
213import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateResponse;
214import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutationProto;
215import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutationProto.MutationType;
216import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.PrepareBulkLoadRequest;
217import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.PrepareBulkLoadResponse;
218import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.RegionAction;
219import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.RegionActionResult;
220import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ResultOrException;
221import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanRequest;
222import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanResponse;
223import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos;
224import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos.RegionLoad;
225import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.NameBytesPair;
226import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.NameInt64Pair;
227import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier;
228import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType;
229import org.apache.hadoop.hbase.shaded.protobuf.generated.MapReduceProtos.ScanMetrics;
230import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsRequest;
231import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsResponse;
232import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsResponse.TableQuotaSnapshot;
233import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader;
234import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.BulkLoadDescriptor;
235import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.CompactionDescriptor;
236import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.FlushDescriptor;
237import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.RegionEventDescriptor;
238
239/**
240 * Implements the regionserver RPC services.
241 */
242@InterfaceAudience.Private
243@SuppressWarnings("deprecation")
244public class RSRpcServices implements HBaseRPCErrorHandler,
245    AdminService.BlockingInterface, ClientService.BlockingInterface, PriorityFunction,
246    ConfigurationObserver {
247  protected static final Logger LOG = LoggerFactory.getLogger(RSRpcServices.class);
248
249  /** RPC scheduler to use for the region server. */
250  public static final String REGION_SERVER_RPC_SCHEDULER_FACTORY_CLASS =
251    "hbase.region.server.rpc.scheduler.factory.class";
252
253  /**
254   * Minimum allowable time limit delta (in milliseconds) that can be enforced during scans. This
255   * configuration exists to prevent the scenario where a time limit is specified to be so
256   * restrictive that the time limit is reached immediately (before any cells are scanned).
257   */
258  private static final String REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA =
259      "hbase.region.server.rpc.minimum.scan.time.limit.delta";
260  /**
261   * Default value of {@link RSRpcServices#REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA}
262   */
263  private static final long DEFAULT_REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA = 10;
264
265  /**
266   * Number of rows in a batch operation above which a warning will be logged.
267   */
268  static final String BATCH_ROWS_THRESHOLD_NAME = "hbase.rpc.rows.warning.threshold";
269  /**
270   * Default value of {@link RSRpcServices#BATCH_ROWS_THRESHOLD_NAME}
271   */
272  static final int BATCH_ROWS_THRESHOLD_DEFAULT = 5000;
273
274  protected static final String RESERVOIR_ENABLED_KEY = "hbase.ipc.server.reservoir.enabled";
275
276  // Request counter. (Includes requests that are not serviced by regions.)
277  // Count only once for requests with multiple actions like multi/caching-scan/replayBatch
278  final LongAdder requestCount = new LongAdder();
279
280  // Request counter for rpc get
281  final LongAdder rpcGetRequestCount = new LongAdder();
282
283  // Request counter for rpc scan
284  final LongAdder rpcScanRequestCount = new LongAdder();
285
286  // Request counter for rpc multi
287  final LongAdder rpcMultiRequestCount = new LongAdder();
288
289  // Request counter for rpc mutate
290  final LongAdder rpcMutateRequestCount = new LongAdder();
291
292  // Server to handle client requests.
293  final RpcServerInterface rpcServer;
294  final InetSocketAddress isa;
295
296  private final HRegionServer regionServer;
297  private final long maxScannerResultSize;
298
299  // The reference to the priority extraction function
300  private final PriorityFunction priority;
301
302  private ScannerIdGenerator scannerIdGenerator;
303  private final ConcurrentMap<String, RegionScannerHolder> scanners = new ConcurrentHashMap<>();
304  // Hold the name of a closed scanner for a while. This is used to keep compatible for old clients
305  // which may send next or close request to a region scanner which has already been exhausted. The
306  // entries will be removed automatically after scannerLeaseTimeoutPeriod.
307  private final Cache<String, String> closedScanners;
308  /**
309   * The lease timeout period for client scanners (milliseconds).
310   */
311  private final int scannerLeaseTimeoutPeriod;
312
313  /**
314   * The RPC timeout period (milliseconds)
315   */
316  private final int rpcTimeout;
317
318  /**
319   * The minimum allowable delta to use for the scan limit
320   */
321  private final long minimumScanTimeLimitDelta;
322
323  /**
324   * Row size threshold for multi requests above which a warning is logged
325   */
326  private final int rowSizeWarnThreshold;
327
328  final AtomicBoolean clearCompactionQueues = new AtomicBoolean(false);
329
330  // We want to vet all accesses at the point of entry itself; limiting scope of access checker
331  // instance to only this class to prevent its use from spreading deeper into implementation.
332  // Initialized in start() since AccessChecker needs ZKWatcher which is created by HRegionServer
333  // after RSRpcServices constructor and before start() is called.
334  // Initialized only if authorization is enabled, else remains null.
335  protected AccessChecker accessChecker;
336
337  /**
338   * Services launched in RSRpcServices. By default they are on but you can use the below
339   * booleans to selectively enable/disable either Admin or Client Service (Rare is the case
340   * where you would ever turn off one or the other).
341   */
342  public static final String REGIONSERVER_ADMIN_SERVICE_CONFIG =
343      "hbase.regionserver.admin.executorService";
344  public static final String REGIONSERVER_CLIENT_SERVICE_CONFIG =
345      "hbase.regionserver.client.executorService";
346
347  /**
348   * An Rpc callback for closing a RegionScanner.
349   */
350  private static final class RegionScannerCloseCallBack implements RpcCallback {
351
352    private final RegionScanner scanner;
353
354    public RegionScannerCloseCallBack(RegionScanner scanner) {
355      this.scanner = scanner;
356    }
357
358    @Override
359    public void run() throws IOException {
360      this.scanner.close();
361    }
362  }
363
364  /**
365   * An Rpc callback for doing shipped() call on a RegionScanner.
366   */
367  private class RegionScannerShippedCallBack implements RpcCallback {
368
369    private final String scannerName;
370    private final Shipper shipper;
371    private final Lease lease;
372
373    public RegionScannerShippedCallBack(String scannerName, Shipper shipper, Lease lease) {
374      this.scannerName = scannerName;
375      this.shipper = shipper;
376      this.lease = lease;
377    }
378
379    @Override
380    public void run() throws IOException {
381      this.shipper.shipped();
382      // We're done. On way out re-add the above removed lease. The lease was temp removed for this
383      // Rpc call and we are at end of the call now. Time to add it back.
384      if (scanners.containsKey(scannerName)) {
385        if (lease != null) regionServer.leases.addLease(lease);
386      }
387    }
388  }
389
390  /**
391   * An RpcCallBack that creates a list of scanners that needs to perform callBack operation on
392   * completion of multiGets.
393   */
394   static class RegionScannersCloseCallBack implements RpcCallback {
395    private final List<RegionScanner> scanners = new ArrayList<>();
396
397    public void addScanner(RegionScanner scanner) {
398      this.scanners.add(scanner);
399    }
400
401    @Override
402    public void run() {
403      for (RegionScanner scanner : scanners) {
404        try {
405          scanner.close();
406        } catch (IOException e) {
407          LOG.error("Exception while closing the scanner " + scanner, e);
408        }
409      }
410    }
411  }
412
413  /**
414   * Holder class which holds the RegionScanner, nextCallSeq and RpcCallbacks together.
415   */
416  private static final class RegionScannerHolder {
417
418    private final AtomicLong nextCallSeq = new AtomicLong(0);
419    private final String scannerName;
420    private final RegionScanner s;
421    private final HRegion r;
422    private final RpcCallback closeCallBack;
423    private final RpcCallback shippedCallback;
424    private byte[] rowOfLastPartialResult;
425    private boolean needCursor;
426
427    public RegionScannerHolder(String scannerName, RegionScanner s, HRegion r,
428        RpcCallback closeCallBack, RpcCallback shippedCallback, boolean needCursor) {
429      this.scannerName = scannerName;
430      this.s = s;
431      this.r = r;
432      this.closeCallBack = closeCallBack;
433      this.shippedCallback = shippedCallback;
434      this.needCursor = needCursor;
435    }
436
437    public long getNextCallSeq() {
438      return nextCallSeq.get();
439    }
440
441    public boolean incNextCallSeq(long currentSeq) {
442      // Use CAS to prevent multiple scan request running on the same scanner.
443      return nextCallSeq.compareAndSet(currentSeq, currentSeq + 1);
444    }
445  }
446
447  /**
448   * Instantiated as a scanner lease. If the lease times out, the scanner is
449   * closed
450   */
451  private class ScannerListener implements LeaseListener {
452    private final String scannerName;
453
454    ScannerListener(final String n) {
455      this.scannerName = n;
456    }
457
458    @Override
459    public void leaseExpired() {
460      RegionScannerHolder rsh = scanners.remove(this.scannerName);
461      if (rsh != null) {
462        RegionScanner s = rsh.s;
463        LOG.info("Scanner " + this.scannerName + " lease expired on region "
464          + s.getRegionInfo().getRegionNameAsString());
465        HRegion region = null;
466        try {
467          region = regionServer.getRegion(s.getRegionInfo().getRegionName());
468          if (region != null && region.getCoprocessorHost() != null) {
469            region.getCoprocessorHost().preScannerClose(s);
470          }
471        } catch (IOException e) {
472          LOG.error("Closing scanner for " + s.getRegionInfo().getRegionNameAsString(), e);
473        } finally {
474          try {
475            s.close();
476            if (region != null && region.getCoprocessorHost() != null) {
477              region.getCoprocessorHost().postScannerClose(s);
478            }
479          } catch (IOException e) {
480            LOG.error("Closing scanner for " + s.getRegionInfo().getRegionNameAsString(), e);
481          }
482        }
483      } else {
484        LOG.warn("Scanner " + this.scannerName + " lease expired, but no related" +
485          " scanner found, hence no chance to close that related scanner!");
486      }
487    }
488  }
489
490  private static ResultOrException getResultOrException(final ClientProtos.Result r,
491                                                        final int index){
492    return getResultOrException(ResponseConverter.buildActionResult(r), index);
493  }
494
495  private static ResultOrException getResultOrException(final Exception e, final int index) {
496    return getResultOrException(ResponseConverter.buildActionResult(e), index);
497  }
498
499  private static ResultOrException getResultOrException(
500      final ResultOrException.Builder builder, final int index) {
501    return builder.setIndex(index).build();
502  }
503
504  /**
505   * Checks for the following pre-checks in order:
506   * <ol>
507   *   <li>RegionServer is running</li>
508   *   <li>If authorization is enabled, then RPC caller has ADMIN permissions</li>
509   * </ol>
510   * @param requestName name of rpc request. Used in reporting failures to provide context.
511   * @throws ServiceException If any of the above listed pre-check fails.
512   */
513  private void rpcPreCheck(String requestName) throws ServiceException {
514    try {
515      checkOpen();
516      requirePermission(requestName, Permission.Action.ADMIN);
517    } catch (IOException ioe) {
518      throw new ServiceException(ioe);
519    }
520  }
521
522  /**
523   * Starts the nonce operation for a mutation, if needed.
524   * @param mutation Mutation.
525   * @param nonceGroup Nonce group from the request.
526   * @return whether to proceed this mutation.
527   */
528  private boolean startNonceOperation(final MutationProto mutation, long nonceGroup)
529      throws IOException {
530    if (regionServer.nonceManager == null || !mutation.hasNonce()) return true;
531    boolean canProceed = false;
532    try {
533      canProceed = regionServer.nonceManager.startOperation(
534        nonceGroup, mutation.getNonce(), regionServer);
535    } catch (InterruptedException ex) {
536      throw new InterruptedIOException("Nonce start operation interrupted");
537    }
538    return canProceed;
539  }
540
541  /**
542   * Ends nonce operation for a mutation, if needed.
543   * @param mutation Mutation.
544   * @param nonceGroup Nonce group from the request. Always 0 in initial implementation.
545   * @param success Whether the operation for this nonce has succeeded.
546   */
547  private void endNonceOperation(final MutationProto mutation,
548      long nonceGroup, boolean success) {
549    if (regionServer.nonceManager != null && mutation.hasNonce()) {
550      regionServer.nonceManager.endOperation(nonceGroup, mutation.getNonce(), success);
551    }
552  }
553
554  private boolean isClientCellBlockSupport(RpcCallContext context) {
555    return context != null && context.isClientCellBlockSupported();
556  }
557
558  private void addResult(final MutateResponse.Builder builder, final Result result,
559      final HBaseRpcController rpcc, boolean clientCellBlockSupported) {
560    if (result == null) return;
561    if (clientCellBlockSupported) {
562      builder.setResult(ProtobufUtil.toResultNoData(result));
563      rpcc.setCellScanner(result.cellScanner());
564    } else {
565      ClientProtos.Result pbr = ProtobufUtil.toResult(result);
566      builder.setResult(pbr);
567    }
568  }
569
570  private void addResults(ScanResponse.Builder builder, List<Result> results,
571      HBaseRpcController controller, boolean isDefaultRegion, boolean clientCellBlockSupported) {
572    builder.setStale(!isDefaultRegion);
573    if (results.isEmpty()) {
574      return;
575    }
576    if (clientCellBlockSupported) {
577      for (Result res : results) {
578        builder.addCellsPerResult(res.size());
579        builder.addPartialFlagPerResult(res.mayHaveMoreCellsInRow());
580      }
581      controller.setCellScanner(CellUtil.createCellScanner(results));
582    } else {
583      for (Result res : results) {
584        ClientProtos.Result pbr = ProtobufUtil.toResult(res);
585        builder.addResults(pbr);
586      }
587    }
588  }
589
590  /**
591   * Mutate a list of rows atomically.
592   * @param cellScanner if non-null, the mutation data -- the Cell content.
593   */
594  private boolean checkAndRowMutate(final HRegion region, final List<ClientProtos.Action> actions,
595    final CellScanner cellScanner, byte[] row, byte[] family, byte[] qualifier, CompareOperator op,
596    ByteArrayComparable comparator, TimeRange timeRange, RegionActionResult.Builder builder,
597    ActivePolicyEnforcement spaceQuotaEnforcement) throws IOException {
598    int countOfCompleteMutation = 0;
599    try {
600      if (!region.getRegionInfo().isMetaRegion()) {
601        regionServer.cacheFlusher.reclaimMemStoreMemory();
602      }
603      RowMutations rm = null;
604      int i = 0;
605      ClientProtos.ResultOrException.Builder resultOrExceptionOrBuilder =
606        ClientProtos.ResultOrException.newBuilder();
607      for (ClientProtos.Action action: actions) {
608        if (action.hasGet()) {
609          throw new DoNotRetryIOException("Atomic put and/or delete only, not a Get=" +
610            action.getGet());
611        }
612        MutationType type = action.getMutation().getMutateType();
613        if (rm == null) {
614          rm = new RowMutations(action.getMutation().getRow().toByteArray(), actions.size());
615        }
616        switch (type) {
617          case PUT:
618            Put put = ProtobufUtil.toPut(action.getMutation(), cellScanner);
619            ++countOfCompleteMutation;
620            checkCellSizeLimit(region, put);
621            spaceQuotaEnforcement.getPolicyEnforcement(region).check(put);
622            rm.add(put);
623            break;
624          case DELETE:
625            Delete del = ProtobufUtil.toDelete(action.getMutation(), cellScanner);
626            ++countOfCompleteMutation;
627            spaceQuotaEnforcement.getPolicyEnforcement(region).check(del);
628            rm.add(del);
629            break;
630          default:
631            throw new DoNotRetryIOException("Atomic put and/or delete only, not " + type.name());
632        }
633        // To unify the response format with doNonAtomicRegionMutation and read through client's
634        // AsyncProcess we have to add an empty result instance per operation
635        resultOrExceptionOrBuilder.clear();
636        resultOrExceptionOrBuilder.setIndex(i++);
637        builder.addResultOrException(
638          resultOrExceptionOrBuilder.build());
639      }
640      return region.checkAndRowMutate(row, family, qualifier, op, comparator, timeRange, rm);
641    } finally {
642      // Currently, the checkAndMutate isn't supported by batch so it won't mess up the cell scanner
643      // even if the malformed cells are not skipped.
644      for (int i = countOfCompleteMutation; i < actions.size(); ++i) {
645        skipCellsForMutation(actions.get(i), cellScanner);
646      }
647    }
648  }
649
650  /**
651   * Execute an append mutation.
652   *
653   * @return result to return to client if default operation should be
654   * bypassed as indicated by RegionObserver, null otherwise
655   */
656  private Result append(final HRegion region, final OperationQuota quota,
657      final MutationProto mutation, final CellScanner cellScanner, long nonceGroup,
658      ActivePolicyEnforcement spaceQuota)
659      throws IOException {
660    long before = EnvironmentEdgeManager.currentTime();
661    Append append = ProtobufUtil.toAppend(mutation, cellScanner);
662    checkCellSizeLimit(region, append);
663    spaceQuota.getPolicyEnforcement(region).check(append);
664    quota.addMutation(append);
665    Result r = null;
666    if (region.getCoprocessorHost() != null) {
667      r = region.getCoprocessorHost().preAppend(append);
668    }
669    if (r == null) {
670      boolean canProceed = startNonceOperation(mutation, nonceGroup);
671      boolean success = false;
672      try {
673        long nonce = mutation.hasNonce() ? mutation.getNonce() : HConstants.NO_NONCE;
674        if (canProceed) {
675          r = region.append(append, nonceGroup, nonce);
676        } else {
677          // convert duplicate append to get
678          List<Cell> results = region.get(ProtobufUtil.toGet(mutation, cellScanner), false,
679              nonceGroup, nonce);
680          r = Result.create(results);
681        }
682        success = true;
683      } finally {
684        if (canProceed) {
685          endNonceOperation(mutation, nonceGroup, success);
686        }
687      }
688      if (region.getCoprocessorHost() != null) {
689        r = region.getCoprocessorHost().postAppend(append, r);
690      }
691    }
692    if (regionServer.metricsRegionServer != null) {
693      regionServer.metricsRegionServer.updateAppend(
694          region.getTableDescriptor().getTableName(),
695        EnvironmentEdgeManager.currentTime() - before);
696    }
697    return r == null ? Result.EMPTY_RESULT : r;
698  }
699
700  /**
701   * Execute an increment mutation.
702   *
703   * @param region
704   * @param mutation
705   * @return the Result
706   * @throws IOException
707   */
708  private Result increment(final HRegion region, final OperationQuota quota,
709      final MutationProto mutation, final CellScanner cells, long nonceGroup,
710      ActivePolicyEnforcement spaceQuota)
711      throws IOException {
712    long before = EnvironmentEdgeManager.currentTime();
713    Increment increment = ProtobufUtil.toIncrement(mutation, cells);
714    checkCellSizeLimit(region, increment);
715    spaceQuota.getPolicyEnforcement(region).check(increment);
716    quota.addMutation(increment);
717    Result r = null;
718    if (region.getCoprocessorHost() != null) {
719      r = region.getCoprocessorHost().preIncrement(increment);
720    }
721    if (r == null) {
722      boolean canProceed = startNonceOperation(mutation, nonceGroup);
723      boolean success = false;
724      try {
725        long nonce = mutation.hasNonce() ? mutation.getNonce() : HConstants.NO_NONCE;
726        if (canProceed) {
727          r = region.increment(increment, nonceGroup, nonce);
728        } else {
729          // convert duplicate increment to get
730          List<Cell> results = region.get(ProtobufUtil.toGet(mutation, cells), false, nonceGroup,
731              nonce);
732          r = Result.create(results);
733        }
734        success = true;
735      } finally {
736        if (canProceed) {
737          endNonceOperation(mutation, nonceGroup, success);
738        }
739      }
740      if (region.getCoprocessorHost() != null) {
741        r = region.getCoprocessorHost().postIncrement(increment, r);
742      }
743    }
744    if (regionServer.metricsRegionServer != null) {
745      regionServer.metricsRegionServer.updateIncrement(
746          region.getTableDescriptor().getTableName(),
747          EnvironmentEdgeManager.currentTime() - before);
748    }
749    return r == null ? Result.EMPTY_RESULT : r;
750  }
751
752  /**
753   * Run through the regionMutation <code>rm</code> and per Mutation, do the work, and then when
754   * done, add an instance of a {@link ResultOrException} that corresponds to each Mutation.
755   * @param cellsToReturn  Could be null. May be allocated in this method.  This is what this
756   * method returns as a 'result'.
757   * @param closeCallBack the callback to be used with multigets
758   * @param context the current RpcCallContext
759   * @return Return the <code>cellScanner</code> passed
760   */
761  private List<CellScannable> doNonAtomicRegionMutation(final HRegion region,
762      final OperationQuota quota, final RegionAction actions, final CellScanner cellScanner,
763      final RegionActionResult.Builder builder, List<CellScannable> cellsToReturn, long nonceGroup,
764      final RegionScannersCloseCallBack closeCallBack, RpcCallContext context,
765      ActivePolicyEnforcement spaceQuotaEnforcement) {
766    // Gather up CONTIGUOUS Puts and Deletes in this mutations List.  Idea is that rather than do
767    // one at a time, we instead pass them in batch.  Be aware that the corresponding
768    // ResultOrException instance that matches each Put or Delete is then added down in the
769    // doNonAtomicBatchOp call.  We should be staying aligned though the Put and Delete are
770    // deferred/batched
771    List<ClientProtos.Action> mutations = null;
772    long maxQuotaResultSize = Math.min(maxScannerResultSize, quota.getReadAvailable());
773    IOException sizeIOE = null;
774    Object lastBlock = null;
775    ClientProtos.ResultOrException.Builder resultOrExceptionBuilder = ResultOrException.newBuilder();
776    boolean hasResultOrException = false;
777    for (ClientProtos.Action action : actions.getActionList()) {
778      hasResultOrException = false;
779      resultOrExceptionBuilder.clear();
780      try {
781        Result r = null;
782
783        if (context != null
784            && context.isRetryImmediatelySupported()
785            && (context.getResponseCellSize() > maxQuotaResultSize
786              || context.getResponseBlockSize() + context.getResponseExceptionSize()
787              > maxQuotaResultSize)) {
788
789          // We're storing the exception since the exception and reason string won't
790          // change after the response size limit is reached.
791          if (sizeIOE == null ) {
792            // We don't need the stack un-winding do don't throw the exception.
793            // Throwing will kill the JVM's JIT.
794            //
795            // Instead just create the exception and then store it.
796            sizeIOE = new MultiActionResultTooLarge("Max size exceeded"
797                + " CellSize: " + context.getResponseCellSize()
798                + " BlockSize: " + context.getResponseBlockSize());
799
800            // Only report the exception once since there's only one request that
801            // caused the exception. Otherwise this number will dominate the exceptions count.
802            rpcServer.getMetrics().exception(sizeIOE);
803          }
804
805          // Now that there's an exception is known to be created
806          // use it for the response.
807          //
808          // This will create a copy in the builder.
809          NameBytesPair pair = ResponseConverter.buildException(sizeIOE);
810          resultOrExceptionBuilder.setException(pair);
811          context.incrementResponseExceptionSize(pair.getSerializedSize());
812          resultOrExceptionBuilder.setIndex(action.getIndex());
813          builder.addResultOrException(resultOrExceptionBuilder.build());
814          skipCellsForMutation(action, cellScanner);
815          continue;
816        }
817        if (action.hasGet()) {
818          long before = EnvironmentEdgeManager.currentTime();
819          ClientProtos.Get pbGet = action.getGet();
820          // An asynchbase client, https://github.com/OpenTSDB/asynchbase, starts by trying to do
821          // a get closest before. Throwing the UnknownProtocolException signals it that it needs
822          // to switch and do hbase2 protocol (HBase servers do not tell clients what versions
823          // they are; its a problem for non-native clients like asynchbase. HBASE-20225.
824          if (pbGet.hasClosestRowBefore() && pbGet.getClosestRowBefore()) {
825            throw new UnknownProtocolException("Is this a pre-hbase-1.0.0 or asynchbase client? " +
826                "Client is invoking getClosestRowBefore removed in hbase-2.0.0 replaced by " +
827                "reverse Scan.");
828          }
829          try {
830            Get get = ProtobufUtil.toGet(pbGet);
831            if (context != null) {
832              r = get(get, (region), closeCallBack, context);
833            } else {
834              r = region.get(get);
835            }
836          } finally {
837            if (regionServer.metricsRegionServer != null) {
838              regionServer.metricsRegionServer.updateGet(
839                  region.getTableDescriptor().getTableName(),
840                  EnvironmentEdgeManager.currentTime() - before);
841            }
842          }
843        } else if (action.hasServiceCall()) {
844          hasResultOrException = true;
845          com.google.protobuf.Message result =
846            execServiceOnRegion(region, action.getServiceCall());
847          ClientProtos.CoprocessorServiceResult.Builder serviceResultBuilder =
848            ClientProtos.CoprocessorServiceResult.newBuilder();
849          resultOrExceptionBuilder.setServiceResult(
850            serviceResultBuilder.setValue(
851              serviceResultBuilder.getValueBuilder()
852                .setName(result.getClass().getName())
853                // TODO: Copy!!!
854                .setValue(UnsafeByteOperations.unsafeWrap(result.toByteArray()))));
855        } else if (action.hasMutation()) {
856          MutationType type = action.getMutation().getMutateType();
857          if (type != MutationType.PUT && type != MutationType.DELETE && mutations != null &&
858              !mutations.isEmpty()) {
859            // Flush out any Puts or Deletes already collected.
860            doNonAtomicBatchOp(builder, region, quota, mutations, cellScanner,
861              spaceQuotaEnforcement);
862            mutations.clear();
863          }
864          switch (type) {
865            case APPEND:
866              r = append(region, quota, action.getMutation(), cellScanner, nonceGroup,
867                  spaceQuotaEnforcement);
868              break;
869            case INCREMENT:
870              r = increment(region, quota, action.getMutation(), cellScanner, nonceGroup,
871                  spaceQuotaEnforcement);
872              break;
873            case PUT:
874            case DELETE:
875              // Collect the individual mutations and apply in a batch
876              if (mutations == null) {
877                mutations = new ArrayList<>(actions.getActionCount());
878              }
879              mutations.add(action);
880              break;
881            default:
882              throw new DoNotRetryIOException("Unsupported mutate type: " + type.name());
883          }
884        } else {
885          throw new HBaseIOException("Unexpected Action type");
886        }
887        if (r != null) {
888          ClientProtos.Result pbResult = null;
889          if (isClientCellBlockSupport(context)) {
890            pbResult = ProtobufUtil.toResultNoData(r);
891            //  Hard to guess the size here.  Just make a rough guess.
892            if (cellsToReturn == null) {
893              cellsToReturn = new ArrayList<>();
894            }
895            cellsToReturn.add(r);
896          } else {
897            pbResult = ProtobufUtil.toResult(r);
898          }
899          lastBlock = addSize(context, r, lastBlock);
900          hasResultOrException = true;
901          resultOrExceptionBuilder.setResult(pbResult);
902        }
903        // Could get to here and there was no result and no exception.  Presumes we added
904        // a Put or Delete to the collecting Mutations List for adding later.  In this
905        // case the corresponding ResultOrException instance for the Put or Delete will be added
906        // down in the doNonAtomicBatchOp method call rather than up here.
907      } catch (IOException ie) {
908        rpcServer.getMetrics().exception(ie);
909        hasResultOrException = true;
910        NameBytesPair pair = ResponseConverter.buildException(ie);
911        resultOrExceptionBuilder.setException(pair);
912        context.incrementResponseExceptionSize(pair.getSerializedSize());
913      }
914      if (hasResultOrException) {
915        // Propagate index.
916        resultOrExceptionBuilder.setIndex(action.getIndex());
917        builder.addResultOrException(resultOrExceptionBuilder.build());
918      }
919    }
920    // Finish up any outstanding mutations
921    if (!CollectionUtils.isEmpty(mutations)) {
922      doNonAtomicBatchOp(builder, region, quota, mutations, cellScanner, spaceQuotaEnforcement);
923    }
924    return cellsToReturn;
925  }
926
927  private void checkCellSizeLimit(final HRegion r, final Mutation m) throws IOException {
928    if (r.maxCellSize > 0) {
929      CellScanner cells = m.cellScanner();
930      while (cells.advance()) {
931        int size = PrivateCellUtil.estimatedSerializedSizeOf(cells.current());
932        if (size > r.maxCellSize) {
933          String msg = "Cell with size " + size + " exceeds limit of " + r.maxCellSize + " bytes";
934          if (LOG.isDebugEnabled()) {
935            LOG.debug(msg);
936          }
937          throw new DoNotRetryIOException(msg);
938        }
939      }
940    }
941  }
942
943  private void doAtomicBatchOp(final RegionActionResult.Builder builder, final HRegion region,
944    final OperationQuota quota, final List<ClientProtos.Action> mutations,
945    final CellScanner cells, ActivePolicyEnforcement spaceQuotaEnforcement)
946    throws IOException {
947    // Just throw the exception. The exception will be caught and then added to region-level
948    // exception for RegionAction. Leaving the null to action result is ok since the null
949    // result is viewed as failure by hbase client. And the region-lever exception will be used
950    // to replaced the null result. see AsyncRequestFutureImpl#receiveMultiAction and
951    // AsyncBatchRpcRetryingCaller#onComplete for more details.
952    doBatchOp(builder, region, quota, mutations, cells, spaceQuotaEnforcement, true);
953  }
954
955  private void doNonAtomicBatchOp(final RegionActionResult.Builder builder, final HRegion region,
956    final OperationQuota quota, final List<ClientProtos.Action> mutations,
957    final CellScanner cells, ActivePolicyEnforcement spaceQuotaEnforcement) {
958    try {
959      doBatchOp(builder, region, quota, mutations, cells, spaceQuotaEnforcement, false);
960    } catch (IOException e) {
961      // Set the exception for each action. The mutations in same RegionAction are group to
962      // different batch and then be processed individually. Hence, we don't set the region-level
963      // exception here for whole RegionAction.
964      for (Action mutation : mutations) {
965        builder.addResultOrException(getResultOrException(e, mutation.getIndex()));
966      }
967    }
968  }
969
970  /**
971   * Execute a list of Put/Delete mutations.
972   *
973   * @param builder
974   * @param region
975   * @param mutations
976   */
977  private void doBatchOp(final RegionActionResult.Builder builder, final HRegion region,
978      final OperationQuota quota, final List<ClientProtos.Action> mutations,
979      final CellScanner cells, ActivePolicyEnforcement spaceQuotaEnforcement, boolean atomic)
980      throws IOException {
981    Mutation[] mArray = new Mutation[mutations.size()];
982    long before = EnvironmentEdgeManager.currentTime();
983    boolean batchContainsPuts = false, batchContainsDelete = false;
984    try {
985      /** HBASE-17924
986       * mutationActionMap is a map to map the relation between mutations and actions
987       * since mutation array may have been reoredered.In order to return the right
988       * result or exception to the corresponding actions, We need to know which action
989       * is the mutation belong to. We can't sort ClientProtos.Action array, since they
990       * are bonded to cellscanners.
991       */
992      Map<Mutation, ClientProtos.Action> mutationActionMap = new HashMap<>();
993      int i = 0;
994      for (ClientProtos.Action action: mutations) {
995        if (action.hasGet()) {
996          throw new DoNotRetryIOException("Atomic put and/or delete only, not a Get=" +
997            action.getGet());
998        }
999        MutationProto m = action.getMutation();
1000        Mutation mutation;
1001        if (m.getMutateType() == MutationType.PUT) {
1002          mutation = ProtobufUtil.toPut(m, cells);
1003          batchContainsPuts = true;
1004        } else {
1005          mutation = ProtobufUtil.toDelete(m, cells);
1006          batchContainsDelete = true;
1007        }
1008        mutationActionMap.put(mutation, action);
1009        mArray[i++] = mutation;
1010        checkCellSizeLimit(region, mutation);
1011        // Check if a space quota disallows this mutation
1012        spaceQuotaEnforcement.getPolicyEnforcement(region).check(mutation);
1013        quota.addMutation(mutation);
1014      }
1015
1016      if (!region.getRegionInfo().isMetaRegion()) {
1017        regionServer.cacheFlusher.reclaimMemStoreMemory();
1018      }
1019
1020      // HBASE-17924
1021      // Sort to improve lock efficiency for non-atomic batch of operations. If atomic
1022      // order is preserved as its expected from the client
1023      if (!atomic) {
1024        Arrays.sort(mArray, (v1, v2) -> Row.COMPARATOR.compare(v1, v2));
1025      }
1026
1027      OperationStatus[] codes = region.batchMutate(mArray, atomic, HConstants.NO_NONCE,
1028        HConstants.NO_NONCE);
1029      for (i = 0; i < codes.length; i++) {
1030        Mutation currentMutation = mArray[i];
1031        ClientProtos.Action currentAction = mutationActionMap.get(currentMutation);
1032        int index = currentAction.hasIndex() || !atomic ? currentAction.getIndex() : i;
1033        Exception e = null;
1034        switch (codes[i].getOperationStatusCode()) {
1035          case BAD_FAMILY:
1036            e = new NoSuchColumnFamilyException(codes[i].getExceptionMsg());
1037            builder.addResultOrException(getResultOrException(e, index));
1038            break;
1039
1040          case SANITY_CHECK_FAILURE:
1041            e = new FailedSanityCheckException(codes[i].getExceptionMsg());
1042            builder.addResultOrException(getResultOrException(e, index));
1043            break;
1044
1045          default:
1046            e = new DoNotRetryIOException(codes[i].getExceptionMsg());
1047            builder.addResultOrException(getResultOrException(e, index));
1048            break;
1049
1050          case SUCCESS:
1051            builder.addResultOrException(getResultOrException(
1052              ClientProtos.Result.getDefaultInstance(), index));
1053            break;
1054        }
1055      }
1056    } finally {
1057      int processedMutationIndex = 0;
1058      for (Action mutation : mutations) {
1059        // The non-null mArray[i] means the cell scanner has been read.
1060        if (mArray[processedMutationIndex++] == null) {
1061          skipCellsForMutation(mutation, cells);
1062        }
1063      }
1064      updateMutationMetrics(region, before, batchContainsPuts, batchContainsDelete);
1065    }
1066  }
1067
1068  private void updateMutationMetrics(HRegion region, long starttime, boolean batchContainsPuts,
1069    boolean batchContainsDelete) {
1070    if (regionServer.metricsRegionServer != null) {
1071      long after = EnvironmentEdgeManager.currentTime();
1072      if (batchContainsPuts) {
1073        regionServer.metricsRegionServer
1074          .updatePutBatch(region.getTableDescriptor().getTableName(), after - starttime);
1075      }
1076      if (batchContainsDelete) {
1077        regionServer.metricsRegionServer
1078          .updateDeleteBatch(region.getTableDescriptor().getTableName(), after - starttime);
1079      }
1080    }
1081  }
1082
1083  /**
1084   * Execute a list of Put/Delete mutations. The function returns OperationStatus instead of
1085   * constructing MultiResponse to save a possible loop if caller doesn't need MultiResponse.
1086   * @param region
1087   * @param mutations
1088   * @param replaySeqId
1089   * @return an array of OperationStatus which internally contains the OperationStatusCode and the
1090   *         exceptionMessage if any
1091   * @throws IOException
1092   */
1093  private OperationStatus [] doReplayBatchOp(final HRegion region,
1094      final List<WALSplitter.MutationReplay> mutations, long replaySeqId) throws IOException {
1095    long before = EnvironmentEdgeManager.currentTime();
1096    boolean batchContainsPuts = false, batchContainsDelete = false;
1097    try {
1098      for (Iterator<WALSplitter.MutationReplay> it = mutations.iterator(); it.hasNext();) {
1099        WALSplitter.MutationReplay m = it.next();
1100
1101        if (m.type == MutationType.PUT) {
1102          batchContainsPuts = true;
1103        } else {
1104          batchContainsDelete = true;
1105        }
1106
1107        NavigableMap<byte[], List<Cell>> map = m.mutation.getFamilyCellMap();
1108        List<Cell> metaCells = map.get(WALEdit.METAFAMILY);
1109        if (metaCells != null && !metaCells.isEmpty()) {
1110          for (Cell metaCell : metaCells) {
1111            CompactionDescriptor compactionDesc = WALEdit.getCompaction(metaCell);
1112            boolean isDefaultReplica = RegionReplicaUtil.isDefaultReplica(region.getRegionInfo());
1113            HRegion hRegion = region;
1114            if (compactionDesc != null) {
1115              // replay the compaction. Remove the files from stores only if we are the primary
1116              // region replica (thus own the files)
1117              hRegion.replayWALCompactionMarker(compactionDesc, !isDefaultReplica, isDefaultReplica,
1118                replaySeqId);
1119              continue;
1120            }
1121            FlushDescriptor flushDesc = WALEdit.getFlushDescriptor(metaCell);
1122            if (flushDesc != null && !isDefaultReplica) {
1123              hRegion.replayWALFlushMarker(flushDesc, replaySeqId);
1124              continue;
1125            }
1126            RegionEventDescriptor regionEvent = WALEdit.getRegionEventDescriptor(metaCell);
1127            if (regionEvent != null && !isDefaultReplica) {
1128              hRegion.replayWALRegionEventMarker(regionEvent);
1129              continue;
1130            }
1131            BulkLoadDescriptor bulkLoadEvent = WALEdit.getBulkLoadDescriptor(metaCell);
1132            if (bulkLoadEvent != null) {
1133              hRegion.replayWALBulkLoadEventMarker(bulkLoadEvent);
1134              continue;
1135            }
1136          }
1137          it.remove();
1138        }
1139      }
1140      requestCount.increment();
1141      if (!region.getRegionInfo().isMetaRegion()) {
1142        regionServer.cacheFlusher.reclaimMemStoreMemory();
1143      }
1144      return region.batchReplay(mutations.toArray(
1145        new WALSplitter.MutationReplay[mutations.size()]), replaySeqId);
1146    } finally {
1147      updateMutationMetrics(region, before, batchContainsPuts, batchContainsDelete);
1148    }
1149  }
1150
1151  private void closeAllScanners() {
1152    // Close any outstanding scanners. Means they'll get an UnknownScanner
1153    // exception next time they come in.
1154    for (Map.Entry<String, RegionScannerHolder> e : scanners.entrySet()) {
1155      try {
1156        e.getValue().s.close();
1157      } catch (IOException ioe) {
1158        LOG.warn("Closing scanner " + e.getKey(), ioe);
1159      }
1160    }
1161  }
1162
1163  // Exposed for testing
1164  interface LogDelegate {
1165    void logBatchWarning(String firstRegionName, int sum, int rowSizeWarnThreshold);
1166  }
1167
1168  private static LogDelegate DEFAULT_LOG_DELEGATE = new LogDelegate() {
1169    @Override
1170    public void logBatchWarning(String firstRegionName, int sum, int rowSizeWarnThreshold) {
1171      if (LOG.isWarnEnabled()) {
1172        LOG.warn("Large batch operation detected (greater than " + rowSizeWarnThreshold
1173            + ") (HBASE-18023)." + " Requested Number of Rows: " + sum + " Client: "
1174            + RpcServer.getRequestUserName().orElse(null) + "/"
1175            + RpcServer.getRemoteAddress().orElse(null)
1176            + " first region in multi=" + firstRegionName);
1177      }
1178    }
1179  };
1180
1181  private final LogDelegate ld;
1182
1183  public RSRpcServices(HRegionServer rs) throws IOException {
1184    this(rs, DEFAULT_LOG_DELEGATE);
1185  }
1186
1187  // Directly invoked only for testing
1188  RSRpcServices(HRegionServer rs, LogDelegate ld) throws IOException {
1189    this.ld = ld;
1190    regionServer = rs;
1191    rowSizeWarnThreshold = rs.conf.getInt(BATCH_ROWS_THRESHOLD_NAME, BATCH_ROWS_THRESHOLD_DEFAULT);
1192    RpcSchedulerFactory rpcSchedulerFactory;
1193    try {
1194      Class<?> cls = rs.conf.getClass(
1195          REGION_SERVER_RPC_SCHEDULER_FACTORY_CLASS,
1196          SimpleRpcSchedulerFactory.class);
1197      rpcSchedulerFactory = cls.asSubclass(RpcSchedulerFactory.class)
1198          .getDeclaredConstructor().newInstance();
1199    } catch (NoSuchMethodException | InvocationTargetException |
1200        InstantiationException | IllegalAccessException e) {
1201      throw new IllegalArgumentException(e);
1202    }
1203    // Server to handle client requests.
1204    InetSocketAddress initialIsa;
1205    InetSocketAddress bindAddress;
1206    if(this instanceof MasterRpcServices) {
1207      String hostname = getHostname(rs.conf, true);
1208      int port = rs.conf.getInt(HConstants.MASTER_PORT, HConstants.DEFAULT_MASTER_PORT);
1209      // Creation of a HSA will force a resolve.
1210      initialIsa = new InetSocketAddress(hostname, port);
1211      bindAddress = new InetSocketAddress(rs.conf.get("hbase.master.ipc.address", hostname), port);
1212    } else {
1213      String hostname = getHostname(rs.conf, false);
1214      int port = rs.conf.getInt(HConstants.REGIONSERVER_PORT,
1215        HConstants.DEFAULT_REGIONSERVER_PORT);
1216      // Creation of a HSA will force a resolve.
1217      initialIsa = new InetSocketAddress(hostname, port);
1218      bindAddress = new InetSocketAddress(
1219        rs.conf.get("hbase.regionserver.ipc.address", hostname), port);
1220    }
1221    if (initialIsa.getAddress() == null) {
1222      throw new IllegalArgumentException("Failed resolve of " + initialIsa);
1223    }
1224    priority = createPriority();
1225    // Using Address means we don't get the IP too. Shorten it more even to just the host name
1226    // w/o the domain.
1227    String name = rs.getProcessName() + "/" +
1228        Address.fromParts(initialIsa.getHostName(), initialIsa.getPort()).toStringWithoutDomain();
1229    // Set how many times to retry talking to another server over Connection.
1230    ConnectionUtils.setServerSideHConnectionRetriesConfig(rs.conf, name, LOG);
1231    rpcServer = createRpcServer(rs, rs.conf, rpcSchedulerFactory, bindAddress, name);
1232    rpcServer.setRsRpcServices(this);
1233    scannerLeaseTimeoutPeriod = rs.conf.getInt(
1234      HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD,
1235      HConstants.DEFAULT_HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD);
1236    maxScannerResultSize = rs.conf.getLong(
1237      HConstants.HBASE_SERVER_SCANNER_MAX_RESULT_SIZE_KEY,
1238      HConstants.DEFAULT_HBASE_SERVER_SCANNER_MAX_RESULT_SIZE);
1239    rpcTimeout = rs.conf.getInt(
1240      HConstants.HBASE_RPC_TIMEOUT_KEY,
1241      HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
1242    minimumScanTimeLimitDelta = rs.conf.getLong(
1243      REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA,
1244      DEFAULT_REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA);
1245
1246    InetSocketAddress address = rpcServer.getListenerAddress();
1247    if (address == null) {
1248      throw new IOException("Listener channel is closed");
1249    }
1250    // Set our address, however we need the final port that was given to rpcServer
1251    isa = new InetSocketAddress(initialIsa.getHostName(), address.getPort());
1252    rpcServer.setErrorHandler(this);
1253    rs.setName(name);
1254
1255    closedScanners = CacheBuilder.newBuilder()
1256        .expireAfterAccess(scannerLeaseTimeoutPeriod, TimeUnit.MILLISECONDS).build();
1257  }
1258
1259  protected RpcServerInterface createRpcServer(Server server, Configuration conf,
1260      RpcSchedulerFactory rpcSchedulerFactory, InetSocketAddress bindAddress, String name)
1261      throws IOException {
1262    boolean reservoirEnabled = conf.getBoolean(RESERVOIR_ENABLED_KEY, true);
1263    try {
1264      return RpcServerFactory.createRpcServer(server, name, getServices(),
1265          bindAddress, // use final bindAddress for this server.
1266          conf, rpcSchedulerFactory.create(conf, this, server), reservoirEnabled);
1267    } catch (BindException be) {
1268      throw new IOException(be.getMessage() + ". To switch ports use the '"
1269          + HConstants.REGIONSERVER_PORT + "' configuration property.",
1270          be.getCause() != null ? be.getCause() : be);
1271    }
1272  }
1273
1274  @Override
1275  public void onConfigurationChange(Configuration newConf) {
1276    if (rpcServer instanceof ConfigurationObserver) {
1277      ((ConfigurationObserver)rpcServer).onConfigurationChange(newConf);
1278    }
1279  }
1280
1281  protected PriorityFunction createPriority() {
1282    return new AnnotationReadingPriorityFunction(this);
1283  }
1284
1285  protected void requirePermission(String request, Permission.Action perm) throws IOException {
1286    if (accessChecker != null) {
1287      accessChecker.requirePermission(RpcServer.getRequestUser().orElse(null), request, perm);
1288    }
1289  }
1290
1291
1292  public static String getHostname(Configuration conf, boolean isMaster)
1293      throws UnknownHostException {
1294    String hostname = conf.get(isMaster? HRegionServer.MASTER_HOSTNAME_KEY :
1295      HRegionServer.RS_HOSTNAME_KEY);
1296    if (hostname == null || hostname.isEmpty()) {
1297      String masterOrRS = isMaster ? "master" : "regionserver";
1298      return Strings.domainNamePointerToHostName(DNS.getDefaultHost(
1299        conf.get("hbase." + masterOrRS + ".dns.interface", "default"),
1300        conf.get("hbase." + masterOrRS + ".dns.nameserver", "default")));
1301    } else {
1302      LOG.info("hostname is configured to be " + hostname);
1303      return hostname;
1304    }
1305  }
1306
1307  @VisibleForTesting
1308  public int getScannersCount() {
1309    return scanners.size();
1310  }
1311
1312  public
1313  RegionScanner getScanner(long scannerId) {
1314    String scannerIdString = Long.toString(scannerId);
1315    RegionScannerHolder scannerHolder = scanners.get(scannerIdString);
1316    if (scannerHolder != null) {
1317      return scannerHolder.s;
1318    }
1319    return null;
1320  }
1321
1322  public String getScanDetailsWithId(long scannerId) {
1323    RegionScanner scanner = getScanner(scannerId);
1324    if (scanner == null) {
1325      return null;
1326    }
1327    StringBuilder builder = new StringBuilder();
1328    builder.append("table: ").append(scanner.getRegionInfo().getTable().getNameAsString());
1329    builder.append(" region: ").append(scanner.getRegionInfo().getRegionNameAsString());
1330    return builder.toString();
1331  }
1332
1333  /**
1334   * Get the vtime associated with the scanner.
1335   * Currently the vtime is the number of "next" calls.
1336   */
1337  long getScannerVirtualTime(long scannerId) {
1338    String scannerIdString = Long.toString(scannerId);
1339    RegionScannerHolder scannerHolder = scanners.get(scannerIdString);
1340    if (scannerHolder != null) {
1341      return scannerHolder.getNextCallSeq();
1342    }
1343    return 0L;
1344  }
1345
1346  /**
1347   * Method to account for the size of retained cells and retained data blocks.
1348   * @return an object that represents the last referenced block from this response.
1349   */
1350  Object addSize(RpcCallContext context, Result r, Object lastBlock) {
1351    if (context != null && r != null && !r.isEmpty()) {
1352      for (Cell c : r.rawCells()) {
1353        context.incrementResponseCellSize(PrivateCellUtil.estimatedSerializedSizeOf(c));
1354
1355        // Since byte buffers can point all kinds of crazy places it's harder to keep track
1356        // of which blocks are kept alive by what byte buffer.
1357        // So we make a guess.
1358        if (c instanceof ByteBufferExtendedCell) {
1359          ByteBufferExtendedCell bbCell = (ByteBufferExtendedCell) c;
1360          ByteBuffer bb = bbCell.getValueByteBuffer();
1361          if (bb != lastBlock) {
1362            context.incrementResponseBlockSize(bb.capacity());
1363            lastBlock = bb;
1364          }
1365        } else {
1366          // We're using the last block being the same as the current block as
1367          // a proxy for pointing to a new block. This won't be exact.
1368          // If there are multiple gets that bounce back and forth
1369          // Then it's possible that this will over count the size of
1370          // referenced blocks. However it's better to over count and
1371          // use two rpcs than to OOME the regionserver.
1372          byte[] valueArray = c.getValueArray();
1373          if (valueArray != lastBlock) {
1374            context.incrementResponseBlockSize(valueArray.length);
1375            lastBlock = valueArray;
1376          }
1377        }
1378
1379      }
1380    }
1381    return lastBlock;
1382  }
1383
1384  private RegionScannerHolder addScanner(String scannerName, RegionScanner s, Shipper shipper,
1385      HRegion r, boolean needCursor) throws LeaseStillHeldException {
1386    Lease lease = regionServer.leases.createLease(scannerName, this.scannerLeaseTimeoutPeriod,
1387      new ScannerListener(scannerName));
1388    RpcCallback shippedCallback = new RegionScannerShippedCallBack(scannerName, shipper, lease);
1389    RpcCallback closeCallback;
1390    if (s instanceof RpcCallback) {
1391      closeCallback = (RpcCallback) s;
1392    } else {
1393      closeCallback = new RegionScannerCloseCallBack(s);
1394    }
1395    RegionScannerHolder rsh =
1396        new RegionScannerHolder(scannerName, s, r, closeCallback, shippedCallback, needCursor);
1397    RegionScannerHolder existing = scanners.putIfAbsent(scannerName, rsh);
1398    assert existing == null : "scannerId must be unique within regionserver's whole lifecycle! " +
1399      scannerName;
1400    return rsh;
1401  }
1402
1403  /**
1404   * Find the HRegion based on a region specifier
1405   *
1406   * @param regionSpecifier the region specifier
1407   * @return the corresponding region
1408   * @throws IOException if the specifier is not null,
1409   *    but failed to find the region
1410   */
1411  @VisibleForTesting
1412  public HRegion getRegion(
1413      final RegionSpecifier regionSpecifier) throws IOException {
1414    return regionServer.getRegion(regionSpecifier.getValue().toByteArray());
1415  }
1416
1417  /**
1418   * Find the List of HRegions based on a list of region specifiers
1419   *
1420   * @param regionSpecifiers the list of region specifiers
1421   * @return the corresponding list of regions
1422   * @throws IOException if any of the specifiers is not null,
1423   *    but failed to find the region
1424   */
1425  private List<HRegion> getRegions(final List<RegionSpecifier> regionSpecifiers,
1426      final CacheEvictionStatsBuilder stats) {
1427    List<HRegion> regions = Lists.newArrayListWithCapacity(regionSpecifiers.size());
1428    for (RegionSpecifier regionSpecifier: regionSpecifiers) {
1429      try {
1430        regions.add(regionServer.getRegion(regionSpecifier.getValue().toByteArray()));
1431      } catch (NotServingRegionException e) {
1432        stats.addException(regionSpecifier.getValue().toByteArray(), e);
1433      }
1434    }
1435    return regions;
1436  }
1437
1438  @VisibleForTesting
1439  public PriorityFunction getPriority() {
1440    return priority;
1441  }
1442
1443  @VisibleForTesting
1444  public Configuration getConfiguration() {
1445    return regionServer.getConfiguration();
1446  }
1447
1448  private RegionServerRpcQuotaManager getRpcQuotaManager() {
1449    return regionServer.getRegionServerRpcQuotaManager();
1450  }
1451
1452  private RegionServerSpaceQuotaManager getSpaceQuotaManager() {
1453    return regionServer.getRegionServerSpaceQuotaManager();
1454  }
1455
1456  void start(ZKWatcher zkWatcher) {
1457    if (AccessChecker.isAuthorizationSupported(getConfiguration())) {
1458      accessChecker = new AccessChecker(getConfiguration(), zkWatcher);
1459    }
1460    this.scannerIdGenerator = new ScannerIdGenerator(this.regionServer.serverName);
1461    rpcServer.start();
1462  }
1463
1464  void stop() {
1465    if (accessChecker != null) {
1466      accessChecker.stop();
1467    }
1468    closeAllScanners();
1469    rpcServer.stop();
1470  }
1471
1472  /**
1473   * Called to verify that this server is up and running.
1474   */
1475  // TODO : Rename this and HMaster#checkInitialized to isRunning() (or a better name).
1476  protected void checkOpen() throws IOException {
1477    if (regionServer.isAborted()) {
1478      throw new RegionServerAbortedException("Server " + regionServer.serverName + " aborting");
1479    }
1480    if (regionServer.isStopped()) {
1481      throw new RegionServerStoppedException("Server " + regionServer.serverName + " stopping");
1482    }
1483    if (!regionServer.fsOk) {
1484      throw new RegionServerStoppedException("File system not available");
1485    }
1486    if (!regionServer.isOnline()) {
1487      throw new ServerNotRunningYetException("Server " + regionServer.serverName
1488          + " is not running yet");
1489    }
1490  }
1491
1492  /**
1493   * By default, put up an Admin and a Client Service.
1494   * Set booleans <code>hbase.regionserver.admin.executorService</code> and
1495   * <code>hbase.regionserver.client.executorService</code> if you want to enable/disable services.
1496   * Default is that both are enabled.
1497   * @return immutable list of blocking services and the security info classes that this server
1498   * supports
1499   */
1500  protected List<BlockingServiceAndInterface> getServices() {
1501    boolean admin =
1502      getConfiguration().getBoolean(REGIONSERVER_ADMIN_SERVICE_CONFIG, true);
1503    boolean client =
1504      getConfiguration().getBoolean(REGIONSERVER_CLIENT_SERVICE_CONFIG, true);
1505    List<BlockingServiceAndInterface> bssi = new ArrayList<>();
1506    if (client) {
1507      bssi.add(new BlockingServiceAndInterface(
1508      ClientService.newReflectiveBlockingService(this),
1509      ClientService.BlockingInterface.class));
1510    }
1511    if (admin) {
1512      bssi.add(new BlockingServiceAndInterface(
1513      AdminService.newReflectiveBlockingService(this),
1514      AdminService.BlockingInterface.class));
1515    }
1516    return new org.apache.hbase.thirdparty.com.google.common.collect.
1517        ImmutableList.Builder<BlockingServiceAndInterface>().addAll(bssi).build();
1518  }
1519
1520  public InetSocketAddress getSocketAddress() {
1521    return isa;
1522  }
1523
1524  @Override
1525  public int getPriority(RequestHeader header, Message param, User user) {
1526    return priority.getPriority(header, param, user);
1527  }
1528
1529  @Override
1530  public long getDeadline(RequestHeader header, Message param) {
1531    return priority.getDeadline(header, param);
1532  }
1533
1534  /*
1535   * Check if an OOME and, if so, abort immediately to avoid creating more objects.
1536   *
1537   * @param e
1538   *
1539   * @return True if we OOME'd and are aborting.
1540   */
1541  @Override
1542  public boolean checkOOME(final Throwable e) {
1543    return exitIfOOME(e);
1544  }
1545
1546  public static boolean exitIfOOME(final Throwable e ){
1547    boolean stop = false;
1548    try {
1549      if (e instanceof OutOfMemoryError
1550          || (e.getCause() != null && e.getCause() instanceof OutOfMemoryError)
1551          || (e.getMessage() != null && e.getMessage().contains(
1552              "java.lang.OutOfMemoryError"))) {
1553        stop = true;
1554        LOG.error(HBaseMarkers.FATAL, "Run out of memory; "
1555          + RSRpcServices.class.getSimpleName() + " will abort itself immediately",
1556          e);
1557      }
1558    } finally {
1559      if (stop) {
1560        Runtime.getRuntime().halt(1);
1561      }
1562    }
1563    return stop;
1564  }
1565
1566  /**
1567   * Close a region on the region server.
1568   *
1569   * @param controller the RPC controller
1570   * @param request the request
1571   * @throws ServiceException
1572   */
1573  @Override
1574  @QosPriority(priority=HConstants.ADMIN_QOS)
1575  public CloseRegionResponse closeRegion(final RpcController controller,
1576      final CloseRegionRequest request) throws ServiceException {
1577    final ServerName sn = (request.hasDestinationServer() ?
1578      ProtobufUtil.toServerName(request.getDestinationServer()) : null);
1579
1580    try {
1581      checkOpen();
1582      if (request.hasServerStartCode()) {
1583        // check that we are the same server that this RPC is intended for.
1584        long serverStartCode = request.getServerStartCode();
1585        if (regionServer.serverName.getStartcode() !=  serverStartCode) {
1586          throw new ServiceException(new DoNotRetryIOException("This RPC was intended for a " +
1587              "different server with startCode: " + serverStartCode + ", this server is: "
1588              + regionServer.serverName));
1589        }
1590      }
1591      final String encodedRegionName = ProtobufUtil.getRegionEncodedName(request.getRegion());
1592
1593      requestCount.increment();
1594      if (sn == null) {
1595        LOG.info("Close " + encodedRegionName + " without moving");
1596      } else {
1597        LOG.info("Close " + encodedRegionName + ", moving to " + sn);
1598      }
1599      boolean closed = regionServer.closeRegion(encodedRegionName, false, sn);
1600      CloseRegionResponse.Builder builder = CloseRegionResponse.newBuilder().setClosed(closed);
1601      return builder.build();
1602    } catch (IOException ie) {
1603      throw new ServiceException(ie);
1604    }
1605  }
1606
1607  /**
1608   * Compact a region on the region server.
1609   *
1610   * @param controller the RPC controller
1611   * @param request the request
1612   * @throws ServiceException
1613   */
1614  @Override
1615  @QosPriority(priority = HConstants.ADMIN_QOS)
1616  public CompactRegionResponse compactRegion(final RpcController controller,
1617      final CompactRegionRequest request) throws ServiceException {
1618    try {
1619      checkOpen();
1620      requestCount.increment();
1621      HRegion region = getRegion(request.getRegion());
1622      // Quota support is enabled, the requesting user is not system/super user
1623      // and a quota policy is enforced that disables compactions.
1624      if (QuotaUtil.isQuotaEnabled(getConfiguration()) &&
1625          !Superusers.isSuperUser(RpcServer.getRequestUser().orElse(null)) &&
1626          this.regionServer.getRegionServerSpaceQuotaManager()
1627              .areCompactionsDisabled(region.getTableDescriptor().getTableName())) {
1628        throw new DoNotRetryIOException(
1629            "Compactions on this region are " + "disabled due to a space quota violation.");
1630      }
1631      region.startRegionOperation(Operation.COMPACT_REGION);
1632      LOG.info("Compacting " + region.getRegionInfo().getRegionNameAsString());
1633      boolean major = request.hasMajor() && request.getMajor();
1634      if (request.hasFamily()) {
1635        byte[] family = request.getFamily().toByteArray();
1636        String log = "User-triggered " + (major ? "major " : "") + "compaction for region " +
1637            region.getRegionInfo().getRegionNameAsString() + " and family " +
1638            Bytes.toString(family);
1639        LOG.trace(log);
1640        region.requestCompaction(family, log, Store.PRIORITY_USER, major,
1641          CompactionLifeCycleTracker.DUMMY);
1642      } else {
1643        String log = "User-triggered " + (major ? "major " : "") + "compaction for region " +
1644            region.getRegionInfo().getRegionNameAsString();
1645        LOG.trace(log);
1646        region.requestCompaction(log, Store.PRIORITY_USER, major, CompactionLifeCycleTracker.DUMMY);
1647      }
1648      return CompactRegionResponse.newBuilder().build();
1649    } catch (IOException ie) {
1650      throw new ServiceException(ie);
1651    }
1652  }
1653
1654  /**
1655   * Flush a region on the region server.
1656   *
1657   * @param controller the RPC controller
1658   * @param request the request
1659   * @throws ServiceException
1660   */
1661  @Override
1662  @QosPriority(priority=HConstants.ADMIN_QOS)
1663  public FlushRegionResponse flushRegion(final RpcController controller,
1664      final FlushRegionRequest request) throws ServiceException {
1665    try {
1666      checkOpen();
1667      requestCount.increment();
1668      HRegion region = getRegion(request.getRegion());
1669      LOG.info("Flushing " + region.getRegionInfo().getRegionNameAsString());
1670      boolean shouldFlush = true;
1671      if (request.hasIfOlderThanTs()) {
1672        shouldFlush = region.getEarliestFlushTimeForAllStores() < request.getIfOlderThanTs();
1673      }
1674      FlushRegionResponse.Builder builder = FlushRegionResponse.newBuilder();
1675      if (shouldFlush) {
1676        boolean writeFlushWalMarker =  request.hasWriteFlushWalMarker() ?
1677            request.getWriteFlushWalMarker() : false;
1678        // Go behind the curtain so we can manage writing of the flush WAL marker
1679        HRegion.FlushResultImpl flushResult =
1680            region.flushcache(true, writeFlushWalMarker, FlushLifeCycleTracker.DUMMY);
1681        boolean compactionNeeded = flushResult.isCompactionNeeded();
1682        if (compactionNeeded) {
1683          regionServer.compactSplitThread.requestSystemCompaction(region,
1684            "Compaction through user triggered flush");
1685        }
1686        builder.setFlushed(flushResult.isFlushSucceeded());
1687        builder.setWroteFlushWalMarker(flushResult.wroteFlushWalMarker);
1688      }
1689      builder.setLastFlushTime(region.getEarliestFlushTimeForAllStores());
1690      return builder.build();
1691    } catch (DroppedSnapshotException ex) {
1692      // Cache flush can fail in a few places. If it fails in a critical
1693      // section, we get a DroppedSnapshotException and a replay of wal
1694      // is required. Currently the only way to do this is a restart of
1695      // the server.
1696      regionServer.abort("Replay of WAL required. Forcing server shutdown", ex);
1697      throw new ServiceException(ex);
1698    } catch (IOException ie) {
1699      throw new ServiceException(ie);
1700    }
1701  }
1702
1703  @Override
1704  @QosPriority(priority=HConstants.ADMIN_QOS)
1705  public GetOnlineRegionResponse getOnlineRegion(final RpcController controller,
1706      final GetOnlineRegionRequest request) throws ServiceException {
1707    try {
1708      checkOpen();
1709      requestCount.increment();
1710      Map<String, HRegion> onlineRegions = regionServer.onlineRegions;
1711      List<RegionInfo> list = new ArrayList<>(onlineRegions.size());
1712      for (HRegion region: onlineRegions.values()) {
1713        list.add(region.getRegionInfo());
1714      }
1715      Collections.sort(list, RegionInfo.COMPARATOR);
1716      return ResponseConverter.buildGetOnlineRegionResponse(list);
1717    } catch (IOException ie) {
1718      throw new ServiceException(ie);
1719    }
1720  }
1721
1722  @Override
1723  @QosPriority(priority=HConstants.ADMIN_QOS)
1724  public GetRegionInfoResponse getRegionInfo(final RpcController controller,
1725      final GetRegionInfoRequest request) throws ServiceException {
1726    try {
1727      checkOpen();
1728      requestCount.increment();
1729      HRegion region = getRegion(request.getRegion());
1730      RegionInfo info = region.getRegionInfo();
1731      byte[] bestSplitRow = null;
1732      boolean shouldSplit = true;
1733      if (request.hasBestSplitRow() && request.getBestSplitRow()) {
1734        HRegion r = region;
1735        region.startRegionOperation(Operation.SPLIT_REGION);
1736        r.forceSplit(null);
1737        // Even after setting force split if split policy says no to split then we should not split.
1738        shouldSplit = region.getSplitPolicy().shouldSplit() && !info.isMetaRegion();
1739        bestSplitRow = r.checkSplit();
1740        // when all table data are in memstore, bestSplitRow = null
1741        // try to flush region first
1742        if(bestSplitRow == null) {
1743          r.flush(true);
1744          bestSplitRow = r.checkSplit();
1745        }
1746        r.clearSplit();
1747      }
1748      GetRegionInfoResponse.Builder builder = GetRegionInfoResponse.newBuilder();
1749      builder.setRegionInfo(ProtobufUtil.toRegionInfo(info));
1750      if (request.hasCompactionState() && request.getCompactionState()) {
1751        builder.setCompactionState(ProtobufUtil.createCompactionState(region.getCompactionState()));
1752      }
1753      builder.setSplittable(region.isSplittable() && shouldSplit);
1754      builder.setMergeable(region.isMergeable());
1755      if (request.hasBestSplitRow() && request.getBestSplitRow() && bestSplitRow != null) {
1756        builder.setBestSplitRow(UnsafeByteOperations.unsafeWrap(bestSplitRow));
1757      }
1758      return builder.build();
1759    } catch (IOException ie) {
1760      throw new ServiceException(ie);
1761    }
1762  }
1763
1764  @Override
1765  @QosPriority(priority=HConstants.ADMIN_QOS)
1766  public GetRegionLoadResponse getRegionLoad(RpcController controller,
1767      GetRegionLoadRequest request) throws ServiceException {
1768
1769    List<HRegion> regions;
1770    if (request.hasTableName()) {
1771      TableName tableName = ProtobufUtil.toTableName(request.getTableName());
1772      regions = regionServer.getRegions(tableName);
1773    } else {
1774      regions = regionServer.getRegions();
1775    }
1776    List<RegionLoad> rLoads = new ArrayList<>(regions.size());
1777    RegionLoad.Builder regionLoadBuilder = ClusterStatusProtos.RegionLoad.newBuilder();
1778    RegionSpecifier.Builder regionSpecifier = RegionSpecifier.newBuilder();
1779
1780    try {
1781      for (HRegion region : regions) {
1782        rLoads.add(regionServer.createRegionLoad(region, regionLoadBuilder, regionSpecifier));
1783      }
1784    } catch (IOException e) {
1785      throw new ServiceException(e);
1786    }
1787    GetRegionLoadResponse.Builder builder = GetRegionLoadResponse.newBuilder();
1788    builder.addAllRegionLoads(rLoads);
1789    return builder.build();
1790  }
1791
1792  @Override
1793  @QosPriority(priority=HConstants.ADMIN_QOS)
1794  public ClearCompactionQueuesResponse clearCompactionQueues(RpcController controller,
1795    ClearCompactionQueuesRequest request) throws ServiceException {
1796    LOG.debug("Client=" + RpcServer.getRequestUserName().orElse(null) + "/"
1797        + RpcServer.getRemoteAddress().orElse(null) + " clear compactions queue");
1798    ClearCompactionQueuesResponse.Builder respBuilder = ClearCompactionQueuesResponse.newBuilder();
1799    requestCount.increment();
1800    if (clearCompactionQueues.compareAndSet(false,true)) {
1801      try {
1802        checkOpen();
1803        regionServer.getRegionServerCoprocessorHost().preClearCompactionQueues();
1804        for (String queueName : request.getQueueNameList()) {
1805          LOG.debug("clear " + queueName + " compaction queue");
1806          switch (queueName) {
1807            case "long":
1808              regionServer.compactSplitThread.clearLongCompactionsQueue();
1809              break;
1810            case "short":
1811              regionServer.compactSplitThread.clearShortCompactionsQueue();
1812              break;
1813            default:
1814              LOG.warn("Unknown queue name " + queueName);
1815              throw new IOException("Unknown queue name " + queueName);
1816          }
1817        }
1818        regionServer.getRegionServerCoprocessorHost().postClearCompactionQueues();
1819      } catch (IOException ie) {
1820        throw new ServiceException(ie);
1821      } finally {
1822        clearCompactionQueues.set(false);
1823      }
1824    } else {
1825      LOG.warn("Clear compactions queue is executing by other admin.");
1826    }
1827    return respBuilder.build();
1828  }
1829
1830  /**
1831   * Get some information of the region server.
1832   *
1833   * @param controller the RPC controller
1834   * @param request the request
1835   * @throws ServiceException
1836   */
1837  @Override
1838  @QosPriority(priority=HConstants.ADMIN_QOS)
1839  public GetServerInfoResponse getServerInfo(final RpcController controller,
1840      final GetServerInfoRequest request) throws ServiceException {
1841    try {
1842      checkOpen();
1843    } catch (IOException ie) {
1844      throw new ServiceException(ie);
1845    }
1846    requestCount.increment();
1847    int infoPort = regionServer.infoServer != null ? regionServer.infoServer.getPort() : -1;
1848    return ResponseConverter.buildGetServerInfoResponse(regionServer.serverName, infoPort);
1849  }
1850
1851  @Override
1852  @QosPriority(priority=HConstants.ADMIN_QOS)
1853  public GetStoreFileResponse getStoreFile(final RpcController controller,
1854      final GetStoreFileRequest request) throws ServiceException {
1855    try {
1856      checkOpen();
1857      HRegion region = getRegion(request.getRegion());
1858      requestCount.increment();
1859      Set<byte[]> columnFamilies;
1860      if (request.getFamilyCount() == 0) {
1861        columnFamilies = region.getTableDescriptor().getColumnFamilyNames();
1862      } else {
1863        columnFamilies = new TreeSet<>(Bytes.BYTES_RAWCOMPARATOR);
1864        for (ByteString cf: request.getFamilyList()) {
1865          columnFamilies.add(cf.toByteArray());
1866        }
1867      }
1868      int nCF = columnFamilies.size();
1869      List<String>  fileList = region.getStoreFileList(
1870        columnFamilies.toArray(new byte[nCF][]));
1871      GetStoreFileResponse.Builder builder = GetStoreFileResponse.newBuilder();
1872      builder.addAllStoreFile(fileList);
1873      return builder.build();
1874    } catch (IOException ie) {
1875      throw new ServiceException(ie);
1876    }
1877  }
1878
1879  /**
1880   * Open asynchronously a region or a set of regions on the region server.
1881   *
1882   * The opening is coordinated by ZooKeeper, and this method requires the znode to be created
1883   *  before being called. As a consequence, this method should be called only from the master.
1884   * <p>
1885   * Different manages states for the region are:
1886   * </p><ul>
1887   *  <li>region not opened: the region opening will start asynchronously.</li>
1888   *  <li>a close is already in progress: this is considered as an error.</li>
1889   *  <li>an open is already in progress: this new open request will be ignored. This is important
1890   *  because the Master can do multiple requests if it crashes.</li>
1891   *  <li>the region is already opened:  this new open request will be ignored.</li>
1892   *  </ul>
1893   * <p>
1894   * Bulk assign: If there are more than 1 region to open, it will be considered as a bulk assign.
1895   * For a single region opening, errors are sent through a ServiceException. For bulk assign,
1896   * errors are put in the response as FAILED_OPENING.
1897   * </p>
1898   * @param controller the RPC controller
1899   * @param request the request
1900   * @throws ServiceException
1901   */
1902  @Override
1903  @QosPriority(priority=HConstants.ADMIN_QOS)
1904  public OpenRegionResponse openRegion(final RpcController controller,
1905      final OpenRegionRequest request) throws ServiceException {
1906    requestCount.increment();
1907    if (request.hasServerStartCode()) {
1908      // check that we are the same server that this RPC is intended for.
1909      long serverStartCode = request.getServerStartCode();
1910      if (regionServer.serverName.getStartcode() !=  serverStartCode) {
1911        throw new ServiceException(new DoNotRetryIOException("This RPC was intended for a " +
1912            "different server with startCode: " + serverStartCode + ", this server is: "
1913            + regionServer.serverName));
1914      }
1915    }
1916
1917    OpenRegionResponse.Builder builder = OpenRegionResponse.newBuilder();
1918    final int regionCount = request.getOpenInfoCount();
1919    final Map<TableName, TableDescriptor> htds = new HashMap<>(regionCount);
1920    final boolean isBulkAssign = regionCount > 1;
1921    try {
1922      checkOpen();
1923    } catch (IOException ie) {
1924      TableName tableName = null;
1925      if (regionCount == 1) {
1926        org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionInfo ri = request.getOpenInfo(0).getRegion();
1927        if (ri != null) {
1928          tableName = ProtobufUtil.toTableName(ri.getTableName());
1929        }
1930      }
1931      if (!TableName.META_TABLE_NAME.equals(tableName)) {
1932        throw new ServiceException(ie);
1933      }
1934      // We are assigning meta, wait a little for regionserver to finish initialization.
1935      int timeout = regionServer.conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY,
1936        HConstants.DEFAULT_HBASE_RPC_TIMEOUT) >> 2; // Quarter of RPC timeout
1937      long endTime = System.currentTimeMillis() + timeout;
1938      synchronized (regionServer.online) {
1939        try {
1940          while (System.currentTimeMillis() <= endTime
1941              && !regionServer.isStopped() && !regionServer.isOnline()) {
1942            regionServer.online.wait(regionServer.msgInterval);
1943          }
1944          checkOpen();
1945        } catch (InterruptedException t) {
1946          Thread.currentThread().interrupt();
1947          throw new ServiceException(t);
1948        } catch (IOException e) {
1949          throw new ServiceException(e);
1950        }
1951      }
1952    }
1953
1954    long masterSystemTime = request.hasMasterSystemTime() ? request.getMasterSystemTime() : -1;
1955
1956    for (RegionOpenInfo regionOpenInfo : request.getOpenInfoList()) {
1957      final RegionInfo region = ProtobufUtil.toRegionInfo(regionOpenInfo.getRegion());
1958      TableDescriptor htd;
1959      try {
1960        String encodedName = region.getEncodedName();
1961        byte[] encodedNameBytes = region.getEncodedNameAsBytes();
1962        final HRegion onlineRegion = regionServer.getRegion(encodedName);
1963        if (onlineRegion != null) {
1964          // The region is already online. This should not happen any more.
1965          String error = "Received OPEN for the region:"
1966            + region.getRegionNameAsString() + ", which is already online";
1967          LOG.warn(error);
1968          //regionServer.abort(error);
1969          //throw new IOException(error);
1970          builder.addOpeningState(RegionOpeningState.OPENED);
1971          continue;
1972        }
1973        LOG.info("Open " + region.getRegionNameAsString());
1974
1975        final Boolean previous = regionServer.regionsInTransitionInRS.putIfAbsent(
1976          encodedNameBytes, Boolean.TRUE);
1977
1978        if (Boolean.FALSE.equals(previous)) {
1979          if (regionServer.getRegion(encodedName) != null) {
1980            // There is a close in progress. This should not happen any more.
1981            String error = "Received OPEN for the region:"
1982              + region.getRegionNameAsString() + ", which we are already trying to CLOSE";
1983            regionServer.abort(error);
1984            throw new IOException(error);
1985          }
1986          regionServer.regionsInTransitionInRS.put(encodedNameBytes, Boolean.TRUE);
1987        }
1988
1989        if (Boolean.TRUE.equals(previous)) {
1990          // An open is in progress. This is supported, but let's log this.
1991          LOG.info("Receiving OPEN for the region:" +
1992            region.getRegionNameAsString() + ", which we are already trying to OPEN"
1993              + " - ignoring this new request for this region.");
1994        }
1995
1996        // We are opening this region. If it moves back and forth for whatever reason, we don't
1997        // want to keep returning the stale moved record while we are opening/if we close again.
1998        regionServer.removeFromMovedRegions(region.getEncodedName());
1999
2000        if (previous == null || !previous.booleanValue()) {
2001          htd = htds.get(region.getTable());
2002          if (htd == null) {
2003            htd = regionServer.tableDescriptors.get(region.getTable());
2004            htds.put(region.getTable(), htd);
2005          }
2006          if (htd == null) {
2007            throw new IOException("Missing table descriptor for " + region.getEncodedName());
2008          }
2009          // If there is no action in progress, we can submit a specific handler.
2010          // Need to pass the expected version in the constructor.
2011          if (regionServer.executorService == null) {
2012            LOG.info("No executor executorService; skipping open request");
2013          } else {
2014            if (region.isMetaRegion()) {
2015              regionServer.executorService.submit(new OpenMetaHandler(
2016              regionServer, regionServer, region, htd, masterSystemTime));
2017            } else {
2018              if (regionOpenInfo.getFavoredNodesCount() > 0) {
2019                regionServer.updateRegionFavoredNodesMapping(region.getEncodedName(),
2020                regionOpenInfo.getFavoredNodesList());
2021              }
2022              if (htd.getPriority() >= HConstants.ADMIN_QOS || region.getTable().isSystemTable()) {
2023                regionServer.executorService.submit(new OpenPriorityRegionHandler(
2024                regionServer, regionServer, region, htd, masterSystemTime));
2025              } else {
2026                regionServer.executorService.submit(new OpenRegionHandler(
2027                regionServer, regionServer, region, htd, masterSystemTime));
2028              }
2029            }
2030          }
2031        }
2032
2033        builder.addOpeningState(RegionOpeningState.OPENED);
2034      } catch (IOException ie) {
2035        LOG.warn("Failed opening region " + region.getRegionNameAsString(), ie);
2036        if (isBulkAssign) {
2037          builder.addOpeningState(RegionOpeningState.FAILED_OPENING);
2038        } else {
2039          throw new ServiceException(ie);
2040        }
2041      }
2042    }
2043    return builder.build();
2044  }
2045
2046  /**
2047   *  Wamrmup a region on this server.
2048   *
2049   * This method should only be called by Master. It synchrnously opens the region and
2050   * closes the region bringing the most important pages in cache.
2051   * <p>
2052   *
2053   * @param controller the RPC controller
2054   * @param request the request
2055   * @throws ServiceException
2056   */
2057  @Override
2058  public WarmupRegionResponse warmupRegion(final RpcController controller,
2059      final WarmupRegionRequest request) throws ServiceException {
2060
2061    final RegionInfo region = ProtobufUtil.toRegionInfo(request.getRegionInfo());
2062    TableDescriptor htd;
2063    WarmupRegionResponse response = WarmupRegionResponse.getDefaultInstance();
2064
2065    try {
2066      checkOpen();
2067      String encodedName = region.getEncodedName();
2068      byte[] encodedNameBytes = region.getEncodedNameAsBytes();
2069      final HRegion onlineRegion = regionServer.getRegion(encodedName);
2070
2071      if (onlineRegion != null) {
2072        LOG.info("Region already online. Skipping warming up " + region);
2073        return response;
2074      }
2075
2076      if (LOG.isDebugEnabled()) {
2077        LOG.debug("Warming up Region " + region.getRegionNameAsString());
2078      }
2079
2080      htd = regionServer.tableDescriptors.get(region.getTable());
2081
2082      if (regionServer.getRegionsInTransitionInRS().containsKey(encodedNameBytes)) {
2083        LOG.info("Region is in transition. Skipping warmup " + region);
2084        return response;
2085      }
2086
2087      HRegion.warmupHRegion(region, htd, regionServer.getWAL(region),
2088          regionServer.getConfiguration(), regionServer, null);
2089
2090    } catch (IOException ie) {
2091      LOG.error("Failed warming up region " + region.getRegionNameAsString(), ie);
2092      throw new ServiceException(ie);
2093    }
2094
2095    return response;
2096  }
2097
2098  /**
2099   * Replay the given changes when distributedLogReplay WAL edits from a failed RS. The guarantee is
2100   * that the given mutations will be durable on the receiving RS if this method returns without any
2101   * exception.
2102   * @param controller the RPC controller
2103   * @param request the request
2104   * @throws ServiceException
2105   */
2106  @Override
2107  @QosPriority(priority = HConstants.REPLAY_QOS)
2108  public ReplicateWALEntryResponse replay(final RpcController controller,
2109      final ReplicateWALEntryRequest request) throws ServiceException {
2110    long before = EnvironmentEdgeManager.currentTime();
2111    CellScanner cells = ((HBaseRpcController) controller).cellScanner();
2112    try {
2113      checkOpen();
2114      List<WALEntry> entries = request.getEntryList();
2115      if (entries == null || entries.isEmpty()) {
2116        // empty input
2117        return ReplicateWALEntryResponse.newBuilder().build();
2118      }
2119      ByteString regionName = entries.get(0).getKey().getEncodedRegionName();
2120      HRegion region = regionServer.getRegionByEncodedName(regionName.toStringUtf8());
2121      RegionCoprocessorHost coprocessorHost =
2122          ServerRegionReplicaUtil.isDefaultReplica(region.getRegionInfo())
2123            ? region.getCoprocessorHost()
2124            : null; // do not invoke coprocessors if this is a secondary region replica
2125      List<Pair<WALKey, WALEdit>> walEntries = new ArrayList<>();
2126
2127      // Skip adding the edits to WAL if this is a secondary region replica
2128      boolean isPrimary = RegionReplicaUtil.isDefaultReplica(region.getRegionInfo());
2129      Durability durability = isPrimary ? Durability.USE_DEFAULT : Durability.SKIP_WAL;
2130
2131      for (WALEntry entry : entries) {
2132        if (!regionName.equals(entry.getKey().getEncodedRegionName())) {
2133          throw new NotServingRegionException("Replay request contains entries from multiple " +
2134              "regions. First region:" + regionName.toStringUtf8() + " , other region:"
2135              + entry.getKey().getEncodedRegionName());
2136        }
2137        if (regionServer.nonceManager != null && isPrimary) {
2138          long nonceGroup = entry.getKey().hasNonceGroup()
2139            ? entry.getKey().getNonceGroup() : HConstants.NO_NONCE;
2140          long nonce = entry.getKey().hasNonce() ? entry.getKey().getNonce() : HConstants.NO_NONCE;
2141          regionServer.nonceManager.reportOperationFromWal(
2142              nonceGroup,
2143              nonce,
2144              entry.getKey().getWriteTime());
2145        }
2146        Pair<WALKey, WALEdit> walEntry = (coprocessorHost == null) ? null : new Pair<>();
2147        List<WALSplitter.MutationReplay> edits = WALSplitter.getMutationsFromWALEntry(entry,
2148          cells, walEntry, durability);
2149        if (coprocessorHost != null) {
2150          // Start coprocessor replay here. The coprocessor is for each WALEdit instead of a
2151          // KeyValue.
2152          if (coprocessorHost.preWALRestore(region.getRegionInfo(), walEntry.getFirst(),
2153            walEntry.getSecond())) {
2154            // if bypass this log entry, ignore it ...
2155            continue;
2156          }
2157          walEntries.add(walEntry);
2158        }
2159        if(edits!=null && !edits.isEmpty()) {
2160          // HBASE-17924
2161          // sort to improve lock efficiency
2162          Collections.sort(edits, (v1, v2) -> Row.COMPARATOR.compare(v1.mutation, v2.mutation));
2163          long replaySeqId = (entry.getKey().hasOrigSequenceNumber()) ?
2164            entry.getKey().getOrigSequenceNumber() : entry.getKey().getLogSequenceNumber();
2165          OperationStatus[] result = doReplayBatchOp(region, edits, replaySeqId);
2166          // check if it's a partial success
2167          for (int i = 0; result != null && i < result.length; i++) {
2168            if (result[i] != OperationStatus.SUCCESS) {
2169              throw new IOException(result[i].getExceptionMsg());
2170            }
2171          }
2172        }
2173      }
2174
2175      //sync wal at the end because ASYNC_WAL is used above
2176      WAL wal = region.getWAL();
2177      if (wal != null) {
2178        wal.sync();
2179      }
2180
2181      if (coprocessorHost != null) {
2182        for (Pair<WALKey, WALEdit> entry : walEntries) {
2183          coprocessorHost.postWALRestore(region.getRegionInfo(), entry.getFirst(),
2184            entry.getSecond());
2185        }
2186      }
2187      return ReplicateWALEntryResponse.newBuilder().build();
2188    } catch (IOException ie) {
2189      throw new ServiceException(ie);
2190    } finally {
2191      if (regionServer.metricsRegionServer != null) {
2192        regionServer.metricsRegionServer.updateReplay(
2193          EnvironmentEdgeManager.currentTime() - before);
2194      }
2195    }
2196  }
2197
2198  /**
2199   * Replicate WAL entries on the region server.
2200   *
2201   * @param controller the RPC controller
2202   * @param request the request
2203   * @throws ServiceException
2204   */
2205  @Override
2206  @QosPriority(priority=HConstants.REPLICATION_QOS)
2207  public ReplicateWALEntryResponse replicateWALEntry(final RpcController controller,
2208      final ReplicateWALEntryRequest request) throws ServiceException {
2209    try {
2210      checkOpen();
2211      if (regionServer.replicationSinkHandler != null) {
2212        requestCount.increment();
2213        List<WALEntry> entries = request.getEntryList();
2214        CellScanner cellScanner = ((HBaseRpcController)controller).cellScanner();
2215        regionServer.getRegionServerCoprocessorHost().preReplicateLogEntries();
2216        regionServer.replicationSinkHandler.replicateLogEntries(entries, cellScanner,
2217          request.getReplicationClusterId(), request.getSourceBaseNamespaceDirPath(),
2218          request.getSourceHFileArchiveDirPath());
2219        regionServer.getRegionServerCoprocessorHost().postReplicateLogEntries();
2220        return ReplicateWALEntryResponse.newBuilder().build();
2221      } else {
2222        throw new ServiceException("Replication services are not initialized yet");
2223      }
2224    } catch (IOException ie) {
2225      throw new ServiceException(ie);
2226    }
2227  }
2228
2229  /**
2230   * Roll the WAL writer of the region server.
2231   * @param controller the RPC controller
2232   * @param request the request
2233   * @throws ServiceException
2234   */
2235  @Override
2236  public RollWALWriterResponse rollWALWriter(final RpcController controller,
2237      final RollWALWriterRequest request) throws ServiceException {
2238    try {
2239      checkOpen();
2240      requestCount.increment();
2241      regionServer.getRegionServerCoprocessorHost().preRollWALWriterRequest();
2242      regionServer.walRoller.requestRollAll();
2243      regionServer.getRegionServerCoprocessorHost().postRollWALWriterRequest();
2244      RollWALWriterResponse.Builder builder = RollWALWriterResponse.newBuilder();
2245      return builder.build();
2246    } catch (IOException ie) {
2247      throw new ServiceException(ie);
2248    }
2249  }
2250
2251
2252  /**
2253   * Stop the region server.
2254   *
2255   * @param controller the RPC controller
2256   * @param request the request
2257   * @throws ServiceException
2258   */
2259  @Override
2260  @QosPriority(priority=HConstants.ADMIN_QOS)
2261  public StopServerResponse stopServer(final RpcController controller,
2262      final StopServerRequest request) throws ServiceException {
2263    requestCount.increment();
2264    String reason = request.getReason();
2265    regionServer.stop(reason);
2266    return StopServerResponse.newBuilder().build();
2267  }
2268
2269  @Override
2270  public UpdateFavoredNodesResponse updateFavoredNodes(RpcController controller,
2271      UpdateFavoredNodesRequest request) throws ServiceException {
2272    List<UpdateFavoredNodesRequest.RegionUpdateInfo> openInfoList = request.getUpdateInfoList();
2273    UpdateFavoredNodesResponse.Builder respBuilder = UpdateFavoredNodesResponse.newBuilder();
2274    for (UpdateFavoredNodesRequest.RegionUpdateInfo regionUpdateInfo : openInfoList) {
2275      RegionInfo hri = ProtobufUtil.toRegionInfo(regionUpdateInfo.getRegion());
2276      if (regionUpdateInfo.getFavoredNodesCount() > 0) {
2277        regionServer.updateRegionFavoredNodesMapping(hri.getEncodedName(),
2278          regionUpdateInfo.getFavoredNodesList());
2279      }
2280    }
2281    respBuilder.setResponse(openInfoList.size());
2282    return respBuilder.build();
2283  }
2284
2285  /**
2286   * Atomically bulk load several HFiles into an open region
2287   * @return true if successful, false is failed but recoverably (no action)
2288   * @throws ServiceException if failed unrecoverably
2289   */
2290  @Override
2291  public BulkLoadHFileResponse bulkLoadHFile(final RpcController controller,
2292      final BulkLoadHFileRequest request) throws ServiceException {
2293    long start = EnvironmentEdgeManager.currentTime();
2294    try {
2295      checkOpen();
2296      requestCount.increment();
2297      HRegion region = getRegion(request.getRegion());
2298      Map<byte[], List<Path>> map = null;
2299
2300      // Check to see if this bulk load would exceed the space quota for this table
2301      if (QuotaUtil.isQuotaEnabled(getConfiguration())) {
2302        ActivePolicyEnforcement activeSpaceQuotas = getSpaceQuotaManager().getActiveEnforcements();
2303        SpaceViolationPolicyEnforcement enforcement = activeSpaceQuotas.getPolicyEnforcement(
2304            region);
2305        if (enforcement != null) {
2306          // Bulk loads must still be atomic. We must enact all or none.
2307          List<String> filePaths = new ArrayList<>(request.getFamilyPathCount());
2308          for (FamilyPath familyPath : request.getFamilyPathList()) {
2309            filePaths.add(familyPath.getPath());
2310          }
2311          // Check if the batch of files exceeds the current quota
2312          enforcement.checkBulkLoad(regionServer.getFileSystem(), filePaths);
2313        }
2314      }
2315
2316      List<Pair<byte[], String>> familyPaths = new ArrayList<>(request.getFamilyPathCount());
2317      for (FamilyPath familyPath : request.getFamilyPathList()) {
2318        familyPaths.add(new Pair<>(familyPath.getFamily().toByteArray(), familyPath.getPath()));
2319      }
2320      if (!request.hasBulkToken()) {
2321        if (region.getCoprocessorHost() != null) {
2322          region.getCoprocessorHost().preBulkLoadHFile(familyPaths);
2323        }
2324        try {
2325          map = region.bulkLoadHFiles(familyPaths, request.getAssignSeqNum(), null,
2326              request.getCopyFile());
2327        } finally {
2328          if (region.getCoprocessorHost() != null) {
2329            region.getCoprocessorHost().postBulkLoadHFile(familyPaths, map);
2330          }
2331        }
2332      } else {
2333        // secure bulk load
2334        map = regionServer.secureBulkLoadManager.secureBulkLoadHFiles(region, request);
2335      }
2336      BulkLoadHFileResponse.Builder builder = BulkLoadHFileResponse.newBuilder();
2337      builder.setLoaded(map != null);
2338      return builder.build();
2339    } catch (IOException ie) {
2340      throw new ServiceException(ie);
2341    } finally {
2342      if (regionServer.metricsRegionServer != null) {
2343        regionServer.metricsRegionServer.updateBulkLoad(
2344            EnvironmentEdgeManager.currentTime() - start);
2345      }
2346    }
2347  }
2348
2349  @Override
2350  public PrepareBulkLoadResponse prepareBulkLoad(RpcController controller,
2351      PrepareBulkLoadRequest request) throws ServiceException {
2352    try {
2353      checkOpen();
2354      requestCount.increment();
2355
2356      HRegion region = getRegion(request.getRegion());
2357
2358      String bulkToken = regionServer.secureBulkLoadManager.prepareBulkLoad(region, request);
2359      PrepareBulkLoadResponse.Builder builder = PrepareBulkLoadResponse.newBuilder();
2360      builder.setBulkToken(bulkToken);
2361      return builder.build();
2362    } catch (IOException ie) {
2363      throw new ServiceException(ie);
2364    }
2365  }
2366
2367  @Override
2368  public CleanupBulkLoadResponse cleanupBulkLoad(RpcController controller,
2369      CleanupBulkLoadRequest request) throws ServiceException {
2370    try {
2371      checkOpen();
2372      requestCount.increment();
2373
2374      HRegion region = getRegion(request.getRegion());
2375
2376      regionServer.secureBulkLoadManager.cleanupBulkLoad(region, request);
2377      CleanupBulkLoadResponse response = CleanupBulkLoadResponse.newBuilder().build();
2378      return response;
2379    } catch (IOException ie) {
2380      throw new ServiceException(ie);
2381    }
2382  }
2383
2384  @Override
2385  public CoprocessorServiceResponse execService(final RpcController controller,
2386      final CoprocessorServiceRequest request) throws ServiceException {
2387    try {
2388      checkOpen();
2389      requestCount.increment();
2390      HRegion region = getRegion(request.getRegion());
2391      com.google.protobuf.Message result = execServiceOnRegion(region, request.getCall());
2392      CoprocessorServiceResponse.Builder builder = CoprocessorServiceResponse.newBuilder();
2393      builder.setRegion(RequestConverter.buildRegionSpecifier(
2394        RegionSpecifierType.REGION_NAME, region.getRegionInfo().getRegionName()));
2395      // TODO: COPIES!!!!!!
2396      builder.setValue(builder.getValueBuilder().setName(result.getClass().getName()).
2397        setValue(org.apache.hbase.thirdparty.com.google.protobuf.ByteString.
2398            copyFrom(result.toByteArray())));
2399      return builder.build();
2400    } catch (IOException ie) {
2401      throw new ServiceException(ie);
2402    }
2403  }
2404
2405  private com.google.protobuf.Message execServiceOnRegion(HRegion region,
2406      final ClientProtos.CoprocessorServiceCall serviceCall) throws IOException {
2407    // ignore the passed in controller (from the serialized call)
2408    ServerRpcController execController = new ServerRpcController();
2409    return region.execService(execController, serviceCall);
2410  }
2411
2412  /**
2413   * Get data from a table.
2414   *
2415   * @param controller the RPC controller
2416   * @param request the get request
2417   * @throws ServiceException
2418   */
2419  @Override
2420  public GetResponse get(final RpcController controller,
2421      final GetRequest request) throws ServiceException {
2422    long before = EnvironmentEdgeManager.currentTime();
2423    OperationQuota quota = null;
2424    HRegion region = null;
2425    try {
2426      checkOpen();
2427      requestCount.increment();
2428      rpcGetRequestCount.increment();
2429      region = getRegion(request.getRegion());
2430
2431      GetResponse.Builder builder = GetResponse.newBuilder();
2432      ClientProtos.Get get = request.getGet();
2433      // An asynchbase client, https://github.com/OpenTSDB/asynchbase, starts by trying to do
2434      // a get closest before. Throwing the UnknownProtocolException signals it that it needs
2435      // to switch and do hbase2 protocol (HBase servers do not tell clients what versions
2436      // they are; its a problem for non-native clients like asynchbase. HBASE-20225.
2437      if (get.hasClosestRowBefore() && get.getClosestRowBefore()) {
2438        throw new UnknownProtocolException("Is this a pre-hbase-1.0.0 or asynchbase client? " +
2439            "Client is invoking getClosestRowBefore removed in hbase-2.0.0 replaced by " +
2440            "reverse Scan.");
2441      }
2442      Boolean existence = null;
2443      Result r = null;
2444      RpcCallContext context = RpcServer.getCurrentCall().orElse(null);
2445      quota = getRpcQuotaManager().checkQuota(region, OperationQuota.OperationType.GET);
2446
2447      Get clientGet = ProtobufUtil.toGet(get);
2448      if (get.getExistenceOnly() && region.getCoprocessorHost() != null) {
2449        existence = region.getCoprocessorHost().preExists(clientGet);
2450      }
2451      if (existence == null) {
2452        if (context != null) {
2453          r = get(clientGet, (region), null, context);
2454        } else {
2455          // for test purpose
2456          r = region.get(clientGet);
2457        }
2458        if (get.getExistenceOnly()) {
2459          boolean exists = r.getExists();
2460          if (region.getCoprocessorHost() != null) {
2461            exists = region.getCoprocessorHost().postExists(clientGet, exists);
2462          }
2463          existence = exists;
2464        }
2465      }
2466      if (existence != null) {
2467        ClientProtos.Result pbr =
2468            ProtobufUtil.toResult(existence, region.getRegionInfo().getReplicaId() != 0);
2469        builder.setResult(pbr);
2470      } else if (r != null) {
2471        ClientProtos.Result pbr;
2472        if (isClientCellBlockSupport(context) && controller instanceof HBaseRpcController
2473            && VersionInfoUtil.hasMinimumVersion(context.getClientVersionInfo(), 1, 3)) {
2474          pbr = ProtobufUtil.toResultNoData(r);
2475          ((HBaseRpcController) controller).setCellScanner(CellUtil.createCellScanner(r
2476              .rawCells()));
2477          addSize(context, r, null);
2478        } else {
2479          pbr = ProtobufUtil.toResult(r);
2480        }
2481        builder.setResult(pbr);
2482      }
2483      //r.cells is null when an table.exists(get) call
2484      if (r != null && r.rawCells() != null) {
2485        quota.addGetResult(r);
2486      }
2487      return builder.build();
2488    } catch (IOException ie) {
2489      throw new ServiceException(ie);
2490    } finally {
2491      MetricsRegionServer mrs = regionServer.metricsRegionServer;
2492      if (mrs != null) {
2493        TableDescriptor td = region != null? region.getTableDescriptor(): null;
2494        if (td != null) {
2495          mrs.updateGet(td.getTableName(), EnvironmentEdgeManager.currentTime() - before);
2496        }
2497      }
2498      if (quota != null) {
2499        quota.close();
2500      }
2501    }
2502  }
2503
2504  private Result get(Get get, HRegion region, RegionScannersCloseCallBack closeCallBack,
2505      RpcCallContext context) throws IOException {
2506    region.prepareGet(get);
2507    boolean stale = region.getRegionInfo().getReplicaId() != 0;
2508
2509    // This method is almost the same as HRegion#get.
2510    List<Cell> results = new ArrayList<>();
2511    long before = EnvironmentEdgeManager.currentTime();
2512    // pre-get CP hook
2513    if (region.getCoprocessorHost() != null) {
2514      if (region.getCoprocessorHost().preGet(get, results)) {
2515        region.metricsUpdateForGet(results, before);
2516        return Result
2517            .create(results, get.isCheckExistenceOnly() ? !results.isEmpty() : null, stale);
2518      }
2519    }
2520    Scan scan = new Scan(get);
2521    if (scan.getLoadColumnFamiliesOnDemandValue() == null) {
2522      scan.setLoadColumnFamiliesOnDemand(region.isLoadingCfsOnDemandDefault());
2523    }
2524    RegionScannerImpl scanner = null;
2525    try {
2526      scanner = region.getScanner(scan);
2527      scanner.next(results);
2528    } finally {
2529      if (scanner != null) {
2530        if (closeCallBack == null) {
2531          // If there is a context then the scanner can be added to the current
2532          // RpcCallContext. The rpc callback will take care of closing the
2533          // scanner, for eg in case
2534          // of get()
2535          context.setCallBack(scanner);
2536        } else {
2537          // The call is from multi() where the results from the get() are
2538          // aggregated and then send out to the
2539          // rpc. The rpccall back will close all such scanners created as part
2540          // of multi().
2541          closeCallBack.addScanner(scanner);
2542        }
2543      }
2544    }
2545
2546    // post-get CP hook
2547    if (region.getCoprocessorHost() != null) {
2548      region.getCoprocessorHost().postGet(get, results);
2549    }
2550    region.metricsUpdateForGet(results, before);
2551
2552    return Result.create(results, get.isCheckExistenceOnly() ? !results.isEmpty() : null, stale);
2553  }
2554
2555  private void checkBatchSizeAndLogLargeSize(MultiRequest request) {
2556    int sum = 0;
2557    String firstRegionName = null;
2558    for (RegionAction regionAction : request.getRegionActionList()) {
2559      if (sum == 0) {
2560        firstRegionName = Bytes.toStringBinary(regionAction.getRegion().getValue().toByteArray());
2561      }
2562      sum += regionAction.getActionCount();
2563    }
2564    if (sum > rowSizeWarnThreshold) {
2565      ld.logBatchWarning(firstRegionName, sum, rowSizeWarnThreshold);
2566    }
2567  }
2568
2569  /**
2570   * Execute multiple actions on a table: get, mutate, and/or execCoprocessor
2571   *
2572   * @param rpcc the RPC controller
2573   * @param request the multi request
2574   * @throws ServiceException
2575   */
2576  @Override
2577  public MultiResponse multi(final RpcController rpcc, final MultiRequest request)
2578  throws ServiceException {
2579    try {
2580      checkOpen();
2581    } catch (IOException ie) {
2582      throw new ServiceException(ie);
2583    }
2584
2585    checkBatchSizeAndLogLargeSize(request);
2586
2587    // rpc controller is how we bring in data via the back door;  it is unprotobuf'ed data.
2588    // It is also the conduit via which we pass back data.
2589    HBaseRpcController controller = (HBaseRpcController)rpcc;
2590    CellScanner cellScanner = controller != null ? controller.cellScanner(): null;
2591    if (controller != null) {
2592      controller.setCellScanner(null);
2593    }
2594
2595    long nonceGroup = request.hasNonceGroup() ? request.getNonceGroup() : HConstants.NO_NONCE;
2596
2597    // this will contain all the cells that we need to return. It's created later, if needed.
2598    List<CellScannable> cellsToReturn = null;
2599    MultiResponse.Builder responseBuilder = MultiResponse.newBuilder();
2600    RegionActionResult.Builder regionActionResultBuilder = RegionActionResult.newBuilder();
2601    Boolean processed = null;
2602    RegionScannersCloseCallBack closeCallBack = null;
2603    RpcCallContext context = RpcServer.getCurrentCall().orElse(null);
2604    this.rpcMultiRequestCount.increment();
2605    this.requestCount.increment();
2606    Map<RegionSpecifier, ClientProtos.RegionLoadStats> regionStats = new HashMap<>(request
2607      .getRegionActionCount());
2608    ActivePolicyEnforcement spaceQuotaEnforcement = getSpaceQuotaManager().getActiveEnforcements();
2609    for (RegionAction regionAction : request.getRegionActionList()) {
2610      OperationQuota quota;
2611      HRegion region;
2612      regionActionResultBuilder.clear();
2613      RegionSpecifier regionSpecifier = regionAction.getRegion();
2614      try {
2615        region = getRegion(regionSpecifier);
2616        quota = getRpcQuotaManager().checkQuota(region, regionAction.getActionList());
2617      } catch (IOException e) {
2618        rpcServer.getMetrics().exception(e);
2619        regionActionResultBuilder.setException(ResponseConverter.buildException(e));
2620        responseBuilder.addRegionActionResult(regionActionResultBuilder.build());
2621        // All Mutations in this RegionAction not executed as we can not see the Region online here
2622        // in this RS. Will be retried from Client. Skipping all the Cells in CellScanner
2623        // corresponding to these Mutations.
2624        skipCellsForMutations(regionAction.getActionList(), cellScanner);
2625        continue;  // For this region it's a failure.
2626      }
2627
2628      if (regionAction.hasAtomic() && regionAction.getAtomic()) {
2629        // How does this call happen?  It may need some work to play well w/ the surroundings.
2630        // Need to return an item per Action along w/ Action index.  TODO.
2631        try {
2632          if (request.hasCondition()) {
2633            Condition condition = request.getCondition();
2634            byte[] row = condition.getRow().toByteArray();
2635            byte[] family = condition.getFamily().toByteArray();
2636            byte[] qualifier = condition.getQualifier().toByteArray();
2637            CompareOperator op =
2638              CompareOperator.valueOf(condition.getCompareType().name());
2639            ByteArrayComparable comparator =
2640                ProtobufUtil.toComparator(condition.getComparator());
2641            TimeRange timeRange = condition.hasTimeRange() ?
2642              ProtobufUtil.toTimeRange(condition.getTimeRange()) :
2643              TimeRange.allTime();
2644            processed =
2645              checkAndRowMutate(region, regionAction.getActionList(), cellScanner, row, family,
2646                qualifier, op, comparator, timeRange, regionActionResultBuilder,
2647                spaceQuotaEnforcement);
2648          } else {
2649            doAtomicBatchOp(regionActionResultBuilder, region, quota, regionAction.getActionList(),
2650              cellScanner, spaceQuotaEnforcement);
2651            processed = Boolean.TRUE;
2652          }
2653        } catch (IOException e) {
2654          rpcServer.getMetrics().exception(e);
2655          // As it's atomic, we may expect it's a global failure.
2656          regionActionResultBuilder.setException(ResponseConverter.buildException(e));
2657        }
2658      } else {
2659        // doNonAtomicRegionMutation manages the exception internally
2660        if (context != null && closeCallBack == null) {
2661          // An RpcCallBack that creates a list of scanners that needs to perform callBack
2662          // operation on completion of multiGets.
2663          // Set this only once
2664          closeCallBack = new RegionScannersCloseCallBack();
2665          context.setCallBack(closeCallBack);
2666        }
2667        cellsToReturn = doNonAtomicRegionMutation(region, quota, regionAction, cellScanner,
2668            regionActionResultBuilder, cellsToReturn, nonceGroup, closeCallBack, context,
2669            spaceQuotaEnforcement);
2670      }
2671      responseBuilder.addRegionActionResult(regionActionResultBuilder.build());
2672      quota.close();
2673      ClientProtos.RegionLoadStats regionLoadStats = region.getLoadStatistics();
2674      if(regionLoadStats != null) {
2675        regionStats.put(regionSpecifier, regionLoadStats);
2676      }
2677    }
2678    // Load the controller with the Cells to return.
2679    if (cellsToReturn != null && !cellsToReturn.isEmpty() && controller != null) {
2680      controller.setCellScanner(CellUtil.createCellScanner(cellsToReturn));
2681    }
2682
2683    if (processed != null) {
2684      responseBuilder.setProcessed(processed);
2685    }
2686
2687    MultiRegionLoadStats.Builder builder = MultiRegionLoadStats.newBuilder();
2688    for(Entry<RegionSpecifier, ClientProtos.RegionLoadStats> stat: regionStats.entrySet()){
2689      builder.addRegion(stat.getKey());
2690      builder.addStat(stat.getValue());
2691    }
2692    responseBuilder.setRegionStatistics(builder);
2693    return responseBuilder.build();
2694  }
2695
2696  private void skipCellsForMutations(List<Action> actions, CellScanner cellScanner) {
2697    if (cellScanner == null) {
2698      return;
2699    }
2700    for (Action action : actions) {
2701      skipCellsForMutation(action, cellScanner);
2702    }
2703  }
2704
2705  private void skipCellsForMutation(Action action, CellScanner cellScanner) {
2706    if (cellScanner == null) {
2707      return;
2708    }
2709    try {
2710      if (action.hasMutation()) {
2711        MutationProto m = action.getMutation();
2712        if (m.hasAssociatedCellCount()) {
2713          for (int i = 0; i < m.getAssociatedCellCount(); i++) {
2714            cellScanner.advance();
2715          }
2716        }
2717      }
2718    } catch (IOException e) {
2719      // No need to handle these Individual Muatation level issue. Any way this entire RegionAction
2720      // marked as failed as we could not see the Region here. At client side the top level
2721      // RegionAction exception will be considered first.
2722      LOG.error("Error while skipping Cells in CellScanner for invalid Region Mutations", e);
2723    }
2724  }
2725
2726  /**
2727   * Mutate data in a table.
2728   *
2729   * @param rpcc the RPC controller
2730   * @param request the mutate request
2731   */
2732  @Override
2733  public MutateResponse mutate(final RpcController rpcc,
2734      final MutateRequest request) throws ServiceException {
2735    // rpc controller is how we bring in data via the back door;  it is unprotobuf'ed data.
2736    // It is also the conduit via which we pass back data.
2737    HBaseRpcController controller = (HBaseRpcController)rpcc;
2738    CellScanner cellScanner = controller != null ? controller.cellScanner() : null;
2739    OperationQuota quota = null;
2740    RpcCallContext context = RpcServer.getCurrentCall().orElse(null);
2741    ActivePolicyEnforcement spaceQuotaEnforcement = null;
2742    MutationType type = null;
2743    HRegion region = null;
2744    long before = EnvironmentEdgeManager.currentTime();
2745    // Clear scanner so we are not holding on to reference across call.
2746    if (controller != null) {
2747      controller.setCellScanner(null);
2748    }
2749    try {
2750      checkOpen();
2751      requestCount.increment();
2752      rpcMutateRequestCount.increment();
2753      region = getRegion(request.getRegion());
2754      MutateResponse.Builder builder = MutateResponse.newBuilder();
2755      MutationProto mutation = request.getMutation();
2756      if (!region.getRegionInfo().isMetaRegion()) {
2757        regionServer.cacheFlusher.reclaimMemStoreMemory();
2758      }
2759      long nonceGroup = request.hasNonceGroup() ? request.getNonceGroup() : HConstants.NO_NONCE;
2760      Result r = null;
2761      Boolean processed = null;
2762      type = mutation.getMutateType();
2763
2764      quota = getRpcQuotaManager().checkQuota(region, OperationQuota.OperationType.MUTATE);
2765      spaceQuotaEnforcement = getSpaceQuotaManager().getActiveEnforcements();
2766
2767      switch (type) {
2768        case APPEND:
2769          // TODO: this doesn't actually check anything.
2770          r = append(region, quota, mutation, cellScanner, nonceGroup, spaceQuotaEnforcement);
2771          break;
2772        case INCREMENT:
2773          // TODO: this doesn't actually check anything.
2774          r = increment(region, quota, mutation, cellScanner, nonceGroup, spaceQuotaEnforcement);
2775          break;
2776        case PUT:
2777          Put put = ProtobufUtil.toPut(mutation, cellScanner);
2778          checkCellSizeLimit(region, put);
2779          // Throws an exception when violated
2780          spaceQuotaEnforcement.getPolicyEnforcement(region).check(put);
2781          quota.addMutation(put);
2782          if (request.hasCondition()) {
2783            Condition condition = request.getCondition();
2784            byte[] row = condition.getRow().toByteArray();
2785            byte[] family = condition.getFamily().toByteArray();
2786            byte[] qualifier = condition.getQualifier().toByteArray();
2787            CompareOperator compareOp =
2788              CompareOperator.valueOf(condition.getCompareType().name());
2789            ByteArrayComparable comparator = ProtobufUtil.toComparator(condition.getComparator());
2790            TimeRange timeRange = condition.hasTimeRange() ?
2791              ProtobufUtil.toTimeRange(condition.getTimeRange()) :
2792              TimeRange.allTime();
2793            if (region.getCoprocessorHost() != null) {
2794              processed = region.getCoprocessorHost().preCheckAndPut(row, family, qualifier,
2795                  compareOp, comparator, put);
2796            }
2797            if (processed == null) {
2798              boolean result = region.checkAndMutate(row, family,
2799                qualifier, compareOp, comparator, timeRange, put);
2800              if (region.getCoprocessorHost() != null) {
2801                result = region.getCoprocessorHost().postCheckAndPut(row, family,
2802                  qualifier, compareOp, comparator, put, result);
2803              }
2804              processed = result;
2805            }
2806          } else {
2807            region.put(put);
2808            processed = Boolean.TRUE;
2809          }
2810          break;
2811        case DELETE:
2812          Delete delete = ProtobufUtil.toDelete(mutation, cellScanner);
2813          checkCellSizeLimit(region, delete);
2814          spaceQuotaEnforcement.getPolicyEnforcement(region).check(delete);
2815          quota.addMutation(delete);
2816          if (request.hasCondition()) {
2817            Condition condition = request.getCondition();
2818            byte[] row = condition.getRow().toByteArray();
2819            byte[] family = condition.getFamily().toByteArray();
2820            byte[] qualifier = condition.getQualifier().toByteArray();
2821            CompareOperator op = CompareOperator.valueOf(condition.getCompareType().name());
2822            ByteArrayComparable comparator = ProtobufUtil.toComparator(condition.getComparator());
2823            TimeRange timeRange = condition.hasTimeRange() ?
2824              ProtobufUtil.toTimeRange(condition.getTimeRange()) :
2825              TimeRange.allTime();
2826            if (region.getCoprocessorHost() != null) {
2827              processed = region.getCoprocessorHost().preCheckAndDelete(row, family, qualifier, op,
2828                  comparator, delete);
2829            }
2830            if (processed == null) {
2831              boolean result = region.checkAndMutate(row, family,
2832                qualifier, op, comparator, timeRange, delete);
2833              if (region.getCoprocessorHost() != null) {
2834                result = region.getCoprocessorHost().postCheckAndDelete(row, family,
2835                  qualifier, op, comparator, delete, result);
2836              }
2837              processed = result;
2838            }
2839          } else {
2840            region.delete(delete);
2841            processed = Boolean.TRUE;
2842          }
2843          break;
2844        default:
2845          throw new DoNotRetryIOException("Unsupported mutate type: " + type.name());
2846      }
2847      if (processed != null) {
2848        builder.setProcessed(processed.booleanValue());
2849      }
2850      boolean clientCellBlockSupported = isClientCellBlockSupport(context);
2851      addResult(builder, r, controller, clientCellBlockSupported);
2852      if (clientCellBlockSupported) {
2853        addSize(context, r, null);
2854      }
2855      return builder.build();
2856    } catch (IOException ie) {
2857      regionServer.checkFileSystem();
2858      throw new ServiceException(ie);
2859    } finally {
2860      if (quota != null) {
2861        quota.close();
2862      }
2863      // Update metrics
2864      if (regionServer.metricsRegionServer != null && type != null) {
2865        long after = EnvironmentEdgeManager.currentTime();
2866        switch (type) {
2867        case DELETE:
2868          if (request.hasCondition()) {
2869            regionServer.metricsRegionServer.updateCheckAndDelete(after - before);
2870          } else {
2871            regionServer.metricsRegionServer.updateDelete(
2872                region == null ? null : region.getRegionInfo().getTable(), after - before);
2873          }
2874          break;
2875        case PUT:
2876          if (request.hasCondition()) {
2877            regionServer.metricsRegionServer.updateCheckAndPut(after - before);
2878          } else {
2879            regionServer.metricsRegionServer.updatePut(
2880                region == null ? null : region.getRegionInfo().getTable(),after - before);
2881          }
2882          break;
2883        default:
2884          break;
2885
2886        }
2887      }
2888    }
2889  }
2890
2891  // This is used to keep compatible with the old client implementation. Consider remove it if we
2892  // decide to drop the support of the client that still sends close request to a region scanner
2893  // which has already been exhausted.
2894  @Deprecated
2895  private static final IOException SCANNER_ALREADY_CLOSED = new IOException() {
2896
2897    private static final long serialVersionUID = -4305297078988180130L;
2898
2899    @Override
2900    public synchronized Throwable fillInStackTrace() {
2901      return this;
2902    }
2903  };
2904
2905  private RegionScannerHolder getRegionScanner(ScanRequest request) throws IOException {
2906    String scannerName = Long.toString(request.getScannerId());
2907    RegionScannerHolder rsh = scanners.get(scannerName);
2908    if (rsh == null) {
2909      // just ignore the next or close request if scanner does not exists.
2910      if (closedScanners.getIfPresent(scannerName) != null) {
2911        throw SCANNER_ALREADY_CLOSED;
2912      } else {
2913        LOG.warn("Client tried to access missing scanner " + scannerName);
2914        throw new UnknownScannerException(
2915            "Unknown scanner '" + scannerName + "'. This can happen due to any of the following " +
2916                "reasons: a) Scanner id given is wrong, b) Scanner lease expired because of " +
2917                "long wait between consecutive client checkins, c) Server may be closing down, " +
2918                "d) RegionServer restart during upgrade.\nIf the issue is due to reason (b), a " +
2919                "possible fix would be increasing the value of" +
2920                "'hbase.client.scanner.timeout.period' configuration.");
2921      }
2922    }
2923    RegionInfo hri = rsh.s.getRegionInfo();
2924    // Yes, should be the same instance
2925    if (regionServer.getOnlineRegion(hri.getRegionName()) != rsh.r) {
2926      String msg = "Region has changed on the scanner " + scannerName + ": regionName="
2927          + hri.getRegionNameAsString() + ", scannerRegionName=" + rsh.r;
2928      LOG.warn(msg + ", closing...");
2929      scanners.remove(scannerName);
2930      try {
2931        rsh.s.close();
2932      } catch (IOException e) {
2933        LOG.warn("Getting exception closing " + scannerName, e);
2934      } finally {
2935        try {
2936          regionServer.leases.cancelLease(scannerName);
2937        } catch (LeaseException e) {
2938          LOG.warn("Getting exception closing " + scannerName, e);
2939        }
2940      }
2941      throw new NotServingRegionException(msg);
2942    }
2943    return rsh;
2944  }
2945
2946  private RegionScannerHolder newRegionScanner(ScanRequest request, ScanResponse.Builder builder)
2947      throws IOException {
2948    HRegion region = getRegion(request.getRegion());
2949    ClientProtos.Scan protoScan = request.getScan();
2950    boolean isLoadingCfsOnDemandSet = protoScan.hasLoadColumnFamiliesOnDemand();
2951    Scan scan = ProtobufUtil.toScan(protoScan);
2952    // if the request doesn't set this, get the default region setting.
2953    if (!isLoadingCfsOnDemandSet) {
2954      scan.setLoadColumnFamiliesOnDemand(region.isLoadingCfsOnDemandDefault());
2955    }
2956
2957    if (!scan.hasFamilies()) {
2958      // Adding all families to scanner
2959      for (byte[] family : region.getTableDescriptor().getColumnFamilyNames()) {
2960        scan.addFamily(family);
2961      }
2962    }
2963    if (region.getCoprocessorHost() != null) {
2964      // preScannerOpen is not allowed to return a RegionScanner. Only post hook can create a
2965      // wrapper for the core created RegionScanner
2966      region.getCoprocessorHost().preScannerOpen(scan);
2967    }
2968    RegionScannerImpl coreScanner = region.getScanner(scan);
2969    Shipper shipper = coreScanner;
2970    RegionScanner scanner = coreScanner;
2971    if (region.getCoprocessorHost() != null) {
2972      scanner = region.getCoprocessorHost().postScannerOpen(scan, scanner);
2973    }
2974    long scannerId = scannerIdGenerator.generateNewScannerId();
2975    builder.setScannerId(scannerId);
2976    builder.setMvccReadPoint(scanner.getMvccReadPoint());
2977    builder.setTtl(scannerLeaseTimeoutPeriod);
2978    String scannerName = String.valueOf(scannerId);
2979    return addScanner(scannerName, scanner, shipper, region, scan.isNeedCursorResult());
2980  }
2981
2982  private void checkScanNextCallSeq(ScanRequest request, RegionScannerHolder rsh)
2983      throws OutOfOrderScannerNextException {
2984    // if nextCallSeq does not match throw Exception straight away. This needs to be
2985    // performed even before checking of Lease.
2986    // See HBASE-5974
2987    if (request.hasNextCallSeq()) {
2988      long callSeq = request.getNextCallSeq();
2989      if (!rsh.incNextCallSeq(callSeq)) {
2990        throw new OutOfOrderScannerNextException("Expected nextCallSeq: " + rsh.getNextCallSeq()
2991            + " But the nextCallSeq got from client: " + request.getNextCallSeq() + "; request="
2992            + TextFormat.shortDebugString(request));
2993      }
2994    }
2995  }
2996
2997  private void addScannerLeaseBack(Leases.Lease lease) {
2998    try {
2999      regionServer.leases.addLease(lease);
3000    } catch (LeaseStillHeldException e) {
3001      // should not happen as the scanner id is unique.
3002      throw new AssertionError(e);
3003    }
3004  }
3005
3006  private long getTimeLimit(HBaseRpcController controller, boolean allowHeartbeatMessages) {
3007    // Set the time limit to be half of the more restrictive timeout value (one of the
3008    // timeout values must be positive). In the event that both values are positive, the
3009    // more restrictive of the two is used to calculate the limit.
3010    if (allowHeartbeatMessages && (scannerLeaseTimeoutPeriod > 0 || rpcTimeout > 0)) {
3011      long timeLimitDelta;
3012      if (scannerLeaseTimeoutPeriod > 0 && rpcTimeout > 0) {
3013        timeLimitDelta = Math.min(scannerLeaseTimeoutPeriod, rpcTimeout);
3014      } else {
3015        timeLimitDelta = scannerLeaseTimeoutPeriod > 0 ? scannerLeaseTimeoutPeriod : rpcTimeout;
3016      }
3017      if (controller != null && controller.getCallTimeout() > 0) {
3018        timeLimitDelta = Math.min(timeLimitDelta, controller.getCallTimeout());
3019      }
3020      // Use half of whichever timeout value was more restrictive... But don't allow
3021      // the time limit to be less than the allowable minimum (could cause an
3022      // immediatate timeout before scanning any data).
3023      timeLimitDelta = Math.max(timeLimitDelta / 2, minimumScanTimeLimitDelta);
3024      // XXX: Can not use EnvironmentEdge here because TestIncrementTimeRange use a
3025      // ManualEnvironmentEdge. Consider using System.nanoTime instead.
3026      return System.currentTimeMillis() + timeLimitDelta;
3027    }
3028    // Default value of timeLimit is negative to indicate no timeLimit should be
3029    // enforced.
3030    return -1L;
3031  }
3032
3033  private void checkLimitOfRows(int numOfCompleteRows, int limitOfRows, boolean moreRows,
3034      ScannerContext scannerContext, ScanResponse.Builder builder) {
3035    if (numOfCompleteRows >= limitOfRows) {
3036      if (LOG.isTraceEnabled()) {
3037        LOG.trace("Done scanning, limit of rows reached, moreRows: " + moreRows +
3038            " scannerContext: " + scannerContext);
3039      }
3040      builder.setMoreResults(false);
3041    }
3042  }
3043
3044  // return whether we have more results in region.
3045  private void scan(HBaseRpcController controller, ScanRequest request, RegionScannerHolder rsh,
3046      long maxQuotaResultSize, int maxResults, int limitOfRows, List<Result> results,
3047      ScanResponse.Builder builder, MutableObject lastBlock, RpcCallContext context)
3048      throws IOException {
3049    HRegion region = rsh.r;
3050    RegionScanner scanner = rsh.s;
3051    long maxResultSize;
3052    if (scanner.getMaxResultSize() > 0) {
3053      maxResultSize = Math.min(scanner.getMaxResultSize(), maxQuotaResultSize);
3054    } else {
3055      maxResultSize = maxQuotaResultSize;
3056    }
3057    // This is cells inside a row. Default size is 10 so if many versions or many cfs,
3058    // then we'll resize. Resizings show in profiler. Set it higher than 10. For now
3059    // arbitrary 32. TODO: keep record of general size of results being returned.
3060    List<Cell> values = new ArrayList<>(32);
3061    region.startRegionOperation(Operation.SCAN);
3062    try {
3063      int numOfResults = 0;
3064      int numOfCompleteRows = 0;
3065      long before = EnvironmentEdgeManager.currentTime();
3066      synchronized (scanner) {
3067        boolean stale = (region.getRegionInfo().getReplicaId() != 0);
3068        boolean clientHandlesPartials =
3069            request.hasClientHandlesPartials() && request.getClientHandlesPartials();
3070        boolean clientHandlesHeartbeats =
3071            request.hasClientHandlesHeartbeats() && request.getClientHandlesHeartbeats();
3072
3073        // On the server side we must ensure that the correct ordering of partial results is
3074        // returned to the client to allow them to properly reconstruct the partial results.
3075        // If the coprocessor host is adding to the result list, we cannot guarantee the
3076        // correct ordering of partial results and so we prevent partial results from being
3077        // formed.
3078        boolean serverGuaranteesOrderOfPartials = results.isEmpty();
3079        boolean allowPartialResults = clientHandlesPartials && serverGuaranteesOrderOfPartials;
3080        boolean moreRows = false;
3081
3082        // Heartbeat messages occur when the processing of the ScanRequest is exceeds a
3083        // certain time threshold on the server. When the time threshold is exceeded, the
3084        // server stops the scan and sends back whatever Results it has accumulated within
3085        // that time period (may be empty). Since heartbeat messages have the potential to
3086        // create partial Results (in the event that the timeout occurs in the middle of a
3087        // row), we must only generate heartbeat messages when the client can handle both
3088        // heartbeats AND partials
3089        boolean allowHeartbeatMessages = clientHandlesHeartbeats && allowPartialResults;
3090
3091        long timeLimit = getTimeLimit(controller, allowHeartbeatMessages);
3092
3093        final LimitScope sizeScope =
3094            allowPartialResults ? LimitScope.BETWEEN_CELLS : LimitScope.BETWEEN_ROWS;
3095        final LimitScope timeScope =
3096            allowHeartbeatMessages ? LimitScope.BETWEEN_CELLS : LimitScope.BETWEEN_ROWS;
3097
3098        boolean trackMetrics = request.hasTrackScanMetrics() && request.getTrackScanMetrics();
3099
3100        // Configure with limits for this RPC. Set keep progress true since size progress
3101        // towards size limit should be kept between calls to nextRaw
3102        ScannerContext.Builder contextBuilder = ScannerContext.newBuilder(true);
3103        // maxResultSize - either we can reach this much size for all cells(being read) data or sum
3104        // of heap size occupied by cells(being read). Cell data means its key and value parts.
3105        contextBuilder.setSizeLimit(sizeScope, maxResultSize, maxResultSize);
3106        contextBuilder.setBatchLimit(scanner.getBatch());
3107        contextBuilder.setTimeLimit(timeScope, timeLimit);
3108        contextBuilder.setTrackMetrics(trackMetrics);
3109        ScannerContext scannerContext = contextBuilder.build();
3110        boolean limitReached = false;
3111        while (numOfResults < maxResults) {
3112          // Reset the batch progress to 0 before every call to RegionScanner#nextRaw. The
3113          // batch limit is a limit on the number of cells per Result. Thus, if progress is
3114          // being tracked (i.e. scannerContext.keepProgress() is true) then we need to
3115          // reset the batch progress between nextRaw invocations since we don't want the
3116          // batch progress from previous calls to affect future calls
3117          scannerContext.setBatchProgress(0);
3118
3119          // Collect values to be returned here
3120          moreRows = scanner.nextRaw(values, scannerContext);
3121
3122          if (!values.isEmpty()) {
3123            if (limitOfRows > 0) {
3124              // First we need to check if the last result is partial and we have a row change. If
3125              // so then we need to increase the numOfCompleteRows.
3126              if (results.isEmpty()) {
3127                if (rsh.rowOfLastPartialResult != null &&
3128                    !CellUtil.matchingRows(values.get(0), rsh.rowOfLastPartialResult)) {
3129                  numOfCompleteRows++;
3130                  checkLimitOfRows(numOfCompleteRows, limitOfRows, moreRows, scannerContext,
3131                    builder);
3132                }
3133              } else {
3134                Result lastResult = results.get(results.size() - 1);
3135                if (lastResult.mayHaveMoreCellsInRow() &&
3136                    !CellUtil.matchingRows(values.get(0), lastResult.getRow())) {
3137                  numOfCompleteRows++;
3138                  checkLimitOfRows(numOfCompleteRows, limitOfRows, moreRows, scannerContext,
3139                    builder);
3140                }
3141              }
3142              if (builder.hasMoreResults() && !builder.getMoreResults()) {
3143                break;
3144              }
3145            }
3146            boolean mayHaveMoreCellsInRow = scannerContext.mayHaveMoreCellsInRow();
3147            Result r = Result.create(values, null, stale, mayHaveMoreCellsInRow);
3148            lastBlock.setValue(addSize(context, r, lastBlock.getValue()));
3149            results.add(r);
3150            numOfResults++;
3151            if (!mayHaveMoreCellsInRow && limitOfRows > 0) {
3152              numOfCompleteRows++;
3153              checkLimitOfRows(numOfCompleteRows, limitOfRows, moreRows, scannerContext, builder);
3154              if (builder.hasMoreResults() && !builder.getMoreResults()) {
3155                break;
3156              }
3157            }
3158          } else if (!moreRows && !results.isEmpty()) {
3159            // No more cells for the scan here, we need to ensure that the mayHaveMoreCellsInRow of
3160            // last result is false. Otherwise it's possible that: the first nextRaw returned
3161            // because BATCH_LIMIT_REACHED (BTW it happen to exhaust all cells of the scan),so the
3162            // last result's mayHaveMoreCellsInRow will be true. while the following nextRaw will
3163            // return with moreRows=false, which means moreResultsInRegion would be false, it will
3164            // be a contradictory state (HBASE-21206).
3165            int lastIdx = results.size() - 1;
3166            Result r = results.get(lastIdx);
3167            if (r.mayHaveMoreCellsInRow()) {
3168              results.set(lastIdx, Result.create(r.rawCells(), r.getExists(), r.isStale(), false));
3169            }
3170          }
3171          boolean sizeLimitReached = scannerContext.checkSizeLimit(LimitScope.BETWEEN_ROWS);
3172          boolean timeLimitReached = scannerContext.checkTimeLimit(LimitScope.BETWEEN_ROWS);
3173          boolean resultsLimitReached = numOfResults >= maxResults;
3174          limitReached = sizeLimitReached || timeLimitReached || resultsLimitReached;
3175
3176          if (limitReached || !moreRows) {
3177            if (LOG.isTraceEnabled()) {
3178              LOG.trace("Done scanning. limitReached: " + limitReached + " moreRows: " + moreRows
3179                  + " scannerContext: " + scannerContext);
3180            }
3181            // We only want to mark a ScanResponse as a heartbeat message in the event that
3182            // there are more values to be read server side. If there aren't more values,
3183            // marking it as a heartbeat is wasteful because the client will need to issue
3184            // another ScanRequest only to realize that they already have all the values
3185            if (moreRows && timeLimitReached) {
3186              // Heartbeat messages occur when the time limit has been reached.
3187              builder.setHeartbeatMessage(true);
3188              if (rsh.needCursor) {
3189                Cell cursorCell = scannerContext.getLastPeekedCell();
3190                if (cursorCell != null) {
3191                  builder.setCursor(ProtobufUtil.toCursor(cursorCell));
3192                }
3193              }
3194            }
3195            break;
3196          }
3197          values.clear();
3198        }
3199        builder.setMoreResultsInRegion(moreRows);
3200        // Check to see if the client requested that we track metrics server side. If the
3201        // client requested metrics, retrieve the metrics from the scanner context.
3202        if (trackMetrics) {
3203          Map<String, Long> metrics = scannerContext.getMetrics().getMetricsMap();
3204          ScanMetrics.Builder metricBuilder = ScanMetrics.newBuilder();
3205          NameInt64Pair.Builder pairBuilder = NameInt64Pair.newBuilder();
3206
3207          for (Entry<String, Long> entry : metrics.entrySet()) {
3208            pairBuilder.setName(entry.getKey());
3209            pairBuilder.setValue(entry.getValue());
3210            metricBuilder.addMetrics(pairBuilder.build());
3211          }
3212
3213          builder.setScanMetrics(metricBuilder.build());
3214        }
3215      }
3216      long end = EnvironmentEdgeManager.currentTime();
3217      long responseCellSize = context != null ? context.getResponseCellSize() : 0;
3218      region.getMetrics().updateScanTime(end - before);
3219      if (regionServer.metricsRegionServer != null) {
3220        regionServer.metricsRegionServer.updateScanSize(
3221            region.getTableDescriptor().getTableName(), responseCellSize);
3222        regionServer.metricsRegionServer.updateScanTime(
3223            region.getTableDescriptor().getTableName(), end - before);
3224      }
3225    } finally {
3226      region.closeRegionOperation();
3227    }
3228    // coprocessor postNext hook
3229    if (region.getCoprocessorHost() != null) {
3230      region.getCoprocessorHost().postScannerNext(scanner, results, maxResults, true);
3231    }
3232  }
3233
3234  /**
3235   * Scan data in a table.
3236   *
3237   * @param controller the RPC controller
3238   * @param request the scan request
3239   * @throws ServiceException
3240   */
3241  @Override
3242  public ScanResponse scan(final RpcController controller, final ScanRequest request)
3243      throws ServiceException {
3244    if (controller != null && !(controller instanceof HBaseRpcController)) {
3245      throw new UnsupportedOperationException(
3246          "We only do " + "HBaseRpcControllers! FIX IF A PROBLEM: " + controller);
3247    }
3248    if (!request.hasScannerId() && !request.hasScan()) {
3249      throw new ServiceException(
3250          new DoNotRetryIOException("Missing required input: scannerId or scan"));
3251    }
3252    try {
3253      checkOpen();
3254    } catch (IOException e) {
3255      if (request.hasScannerId()) {
3256        String scannerName = Long.toString(request.getScannerId());
3257        if (LOG.isDebugEnabled()) {
3258          LOG.debug(
3259            "Server shutting down and client tried to access missing scanner " + scannerName);
3260        }
3261        if (regionServer.leases != null) {
3262          try {
3263            regionServer.leases.cancelLease(scannerName);
3264          } catch (LeaseException le) {
3265            // No problem, ignore
3266            if (LOG.isTraceEnabled()) {
3267              LOG.trace("Un-able to cancel lease of scanner. It could already be closed.");
3268            }
3269          }
3270        }
3271      }
3272      throw new ServiceException(e);
3273    }
3274    requestCount.increment();
3275    rpcScanRequestCount.increment();
3276    RegionScannerHolder rsh;
3277    ScanResponse.Builder builder = ScanResponse.newBuilder();
3278    try {
3279      if (request.hasScannerId()) {
3280        // The downstream projects such as AsyncHBase in OpenTSDB need this value. See HBASE-18000
3281        // for more details.
3282        builder.setScannerId(request.getScannerId());
3283        rsh = getRegionScanner(request);
3284      } else {
3285        rsh = newRegionScanner(request, builder);
3286      }
3287    } catch (IOException e) {
3288      if (e == SCANNER_ALREADY_CLOSED) {
3289        // Now we will close scanner automatically if there are no more results for this region but
3290        // the old client will still send a close request to us. Just ignore it and return.
3291        return builder.build();
3292      }
3293      throw new ServiceException(e);
3294    }
3295    HRegion region = rsh.r;
3296    String scannerName = rsh.scannerName;
3297    Leases.Lease lease;
3298    try {
3299      // Remove lease while its being processed in server; protects against case
3300      // where processing of request takes > lease expiration time.
3301      lease = regionServer.leases.removeLease(scannerName);
3302    } catch (LeaseException e) {
3303      throw new ServiceException(e);
3304    }
3305    if (request.hasRenew() && request.getRenew()) {
3306      // add back and return
3307      addScannerLeaseBack(lease);
3308      try {
3309        checkScanNextCallSeq(request, rsh);
3310      } catch (OutOfOrderScannerNextException e) {
3311        throw new ServiceException(e);
3312      }
3313      return builder.build();
3314    }
3315    OperationQuota quota;
3316    try {
3317      quota = getRpcQuotaManager().checkQuota(region, OperationQuota.OperationType.SCAN);
3318    } catch (IOException e) {
3319      addScannerLeaseBack(lease);
3320      throw new ServiceException(e);
3321    }
3322    try {
3323      checkScanNextCallSeq(request, rsh);
3324    } catch (OutOfOrderScannerNextException e) {
3325      addScannerLeaseBack(lease);
3326      throw new ServiceException(e);
3327    }
3328    // Now we have increased the next call sequence. If we give client an error, the retry will
3329    // never success. So we'd better close the scanner and return a DoNotRetryIOException to client
3330    // and then client will try to open a new scanner.
3331    boolean closeScanner = request.hasCloseScanner() ? request.getCloseScanner() : false;
3332    int rows; // this is scan.getCaching
3333    if (request.hasNumberOfRows()) {
3334      rows = request.getNumberOfRows();
3335    } else {
3336      rows = closeScanner ? 0 : 1;
3337    }
3338    RpcCallContext context = RpcServer.getCurrentCall().orElse(null);
3339    // now let's do the real scan.
3340    long maxQuotaResultSize = Math.min(maxScannerResultSize, quota.getReadAvailable());
3341    RegionScanner scanner = rsh.s;
3342    // this is the limit of rows for this scan, if we the number of rows reach this value, we will
3343    // close the scanner.
3344    int limitOfRows;
3345    if (request.hasLimitOfRows()) {
3346      limitOfRows = request.getLimitOfRows();
3347    } else {
3348      limitOfRows = -1;
3349    }
3350    MutableObject<Object> lastBlock = new MutableObject<>();
3351    boolean scannerClosed = false;
3352    try {
3353      List<Result> results = new ArrayList<>();
3354      if (rows > 0) {
3355        boolean done = false;
3356        // Call coprocessor. Get region info from scanner.
3357        if (region.getCoprocessorHost() != null) {
3358          Boolean bypass = region.getCoprocessorHost().preScannerNext(scanner, results, rows);
3359          if (!results.isEmpty()) {
3360            for (Result r : results) {
3361              lastBlock.setValue(addSize(context, r, lastBlock.getValue()));
3362            }
3363          }
3364          if (bypass != null && bypass.booleanValue()) {
3365            done = true;
3366          }
3367        }
3368        if (!done) {
3369          scan((HBaseRpcController) controller, request, rsh, maxQuotaResultSize, rows, limitOfRows,
3370            results, builder, lastBlock, context);
3371        } else {
3372          builder.setMoreResultsInRegion(!results.isEmpty());
3373        }
3374      } else {
3375        // This is a open scanner call with numberOfRow = 0, so set more results in region to true.
3376        builder.setMoreResultsInRegion(true);
3377      }
3378
3379      quota.addScanResult(results);
3380      addResults(builder, results, (HBaseRpcController) controller,
3381        RegionReplicaUtil.isDefaultReplica(region.getRegionInfo()),
3382        isClientCellBlockSupport(context));
3383      if (scanner.isFilterDone() && results.isEmpty()) {
3384        // If the scanner's filter - if any - is done with the scan
3385        // only set moreResults to false if the results is empty. This is used to keep compatible
3386        // with the old scan implementation where we just ignore the returned results if moreResults
3387        // is false. Can remove the isEmpty check after we get rid of the old implementation.
3388        builder.setMoreResults(false);
3389      }
3390      // Later we may close the scanner depending on this flag so here we need to make sure that we
3391      // have already set this flag.
3392      assert builder.hasMoreResultsInRegion();
3393      // we only set moreResults to false in the above code, so set it to true if we haven't set it
3394      // yet.
3395      if (!builder.hasMoreResults()) {
3396        builder.setMoreResults(true);
3397      }
3398      if (builder.getMoreResults() && builder.getMoreResultsInRegion() && !results.isEmpty()) {
3399        // Record the last cell of the last result if it is a partial result
3400        // We need this to calculate the complete rows we have returned to client as the
3401        // mayHaveMoreCellsInRow is true does not mean that there will be extra cells for the
3402        // current row. We may filter out all the remaining cells for the current row and just
3403        // return the cells of the nextRow when calling RegionScanner.nextRaw. So here we need to
3404        // check for row change.
3405        Result lastResult = results.get(results.size() - 1);
3406        if (lastResult.mayHaveMoreCellsInRow()) {
3407          rsh.rowOfLastPartialResult = lastResult.getRow();
3408        } else {
3409          rsh.rowOfLastPartialResult = null;
3410        }
3411      }
3412      if (!builder.getMoreResults() || !builder.getMoreResultsInRegion() || closeScanner) {
3413        scannerClosed = true;
3414        closeScanner(region, scanner, scannerName, context);
3415      }
3416      return builder.build();
3417    } catch (IOException e) {
3418      try {
3419        // scanner is closed here
3420        scannerClosed = true;
3421        // The scanner state might be left in a dirty state, so we will tell the Client to
3422        // fail this RPC and close the scanner while opening up another one from the start of
3423        // row that the client has last seen.
3424        closeScanner(region, scanner, scannerName, context);
3425
3426        // If it is a DoNotRetryIOException already, throw as it is. Unfortunately, DNRIOE is
3427        // used in two different semantics.
3428        // (1) The first is to close the client scanner and bubble up the exception all the way
3429        // to the application. This is preferred when the exception is really un-recoverable
3430        // (like CorruptHFileException, etc). Plain DoNotRetryIOException also falls into this
3431        // bucket usually.
3432        // (2) Second semantics is to close the current region scanner only, but continue the
3433        // client scanner by overriding the exception. This is usually UnknownScannerException,
3434        // OutOfOrderScannerNextException, etc where the region scanner has to be closed, but the
3435        // application-level ClientScanner has to continue without bubbling up the exception to
3436        // the client. See ClientScanner code to see how it deals with these special exceptions.
3437        if (e instanceof DoNotRetryIOException) {
3438          throw e;
3439        }
3440
3441        // If it is a FileNotFoundException, wrap as a
3442        // DoNotRetryIOException. This can avoid the retry in ClientScanner.
3443        if (e instanceof FileNotFoundException) {
3444          throw new DoNotRetryIOException(e);
3445        }
3446
3447        // We closed the scanner already. Instead of throwing the IOException, and client
3448        // retrying with the same scannerId only to get USE on the next RPC, we directly throw
3449        // a special exception to save an RPC.
3450        if (VersionInfoUtil.hasMinimumVersion(context.getClientVersionInfo(), 1, 4)) {
3451          // 1.4.0+ clients know how to handle
3452          throw new ScannerResetException("Scanner is closed on the server-side", e);
3453        } else {
3454          // older clients do not know about SRE. Just throw USE, which they will handle
3455          throw new UnknownScannerException("Throwing UnknownScannerException to reset the client"
3456              + " scanner state for clients older than 1.3.", e);
3457        }
3458      } catch (IOException ioe) {
3459        throw new ServiceException(ioe);
3460      }
3461    } finally {
3462      if (!scannerClosed) {
3463        // Adding resets expiration time on lease.
3464        // the closeCallBack will be set in closeScanner so here we only care about shippedCallback
3465        if (context != null) {
3466          context.setCallBack(rsh.shippedCallback);
3467        } else {
3468          // When context != null, adding back the lease will be done in callback set above.
3469          addScannerLeaseBack(lease);
3470        }
3471      }
3472      quota.close();
3473    }
3474  }
3475
3476  private void closeScanner(HRegion region, RegionScanner scanner, String scannerName,
3477      RpcCallContext context) throws IOException {
3478    if (region.getCoprocessorHost() != null) {
3479      if (region.getCoprocessorHost().preScannerClose(scanner)) {
3480        // bypass the actual close.
3481        return;
3482      }
3483    }
3484    RegionScannerHolder rsh = scanners.remove(scannerName);
3485    if (rsh != null) {
3486      if (context != null) {
3487        context.setCallBack(rsh.closeCallBack);
3488      } else {
3489        rsh.s.close();
3490      }
3491      if (region.getCoprocessorHost() != null) {
3492        region.getCoprocessorHost().postScannerClose(scanner);
3493      }
3494      closedScanners.put(scannerName, scannerName);
3495    }
3496  }
3497
3498  @Override
3499  public CoprocessorServiceResponse execRegionServerService(RpcController controller,
3500      CoprocessorServiceRequest request) throws ServiceException {
3501    rpcPreCheck("execRegionServerService");
3502    return regionServer.execRegionServerService(controller, request);
3503  }
3504
3505  @Override
3506  public UpdateConfigurationResponse updateConfiguration(
3507      RpcController controller, UpdateConfigurationRequest request)
3508      throws ServiceException {
3509    try {
3510      this.regionServer.updateConfiguration();
3511    } catch (Exception e) {
3512      throw new ServiceException(e);
3513    }
3514    return UpdateConfigurationResponse.getDefaultInstance();
3515  }
3516
3517  @Override
3518  public GetSpaceQuotaSnapshotsResponse getSpaceQuotaSnapshots(
3519      RpcController controller, GetSpaceQuotaSnapshotsRequest request) throws ServiceException {
3520    try {
3521      final RegionServerSpaceQuotaManager manager =
3522          regionServer.getRegionServerSpaceQuotaManager();
3523      final GetSpaceQuotaSnapshotsResponse.Builder builder =
3524          GetSpaceQuotaSnapshotsResponse.newBuilder();
3525      if (manager != null) {
3526        final Map<TableName,SpaceQuotaSnapshot> snapshots = manager.copyQuotaSnapshots();
3527        for (Entry<TableName,SpaceQuotaSnapshot> snapshot : snapshots.entrySet()) {
3528          builder.addSnapshots(TableQuotaSnapshot.newBuilder()
3529              .setTableName(ProtobufUtil.toProtoTableName(snapshot.getKey()))
3530              .setSnapshot(SpaceQuotaSnapshot.toProtoSnapshot(snapshot.getValue()))
3531              .build());
3532        }
3533      }
3534      return builder.build();
3535    } catch (Exception e) {
3536      throw new ServiceException(e);
3537    }
3538  }
3539
3540  @Override
3541  public ExecuteProceduresResponse executeProcedures(RpcController controller,
3542       ExecuteProceduresRequest request) throws ServiceException {
3543    ExecuteProceduresResponse.Builder builder = ExecuteProceduresResponse.newBuilder();
3544    if (request.getOpenRegionCount() > 0) {
3545      for (OpenRegionRequest req : request.getOpenRegionList()) {
3546        builder.addOpenRegion(openRegion(controller, req));
3547      }
3548    }
3549    if (request.getCloseRegionCount() > 0) {
3550      for (CloseRegionRequest req : request.getCloseRegionList()) {
3551        builder.addCloseRegion(closeRegion(controller, req));
3552      }
3553    }
3554    return builder.build();
3555  }
3556
3557  @Override
3558  public ClearRegionBlockCacheResponse clearRegionBlockCache(RpcController controller,
3559      ClearRegionBlockCacheRequest request) {
3560    ClearRegionBlockCacheResponse.Builder builder =
3561        ClearRegionBlockCacheResponse.newBuilder();
3562    CacheEvictionStatsBuilder stats = CacheEvictionStats.builder();
3563    List<HRegion> regions = getRegions(request.getRegionList(), stats);
3564    for (HRegion region : regions) {
3565      try {
3566        stats = stats.append(this.regionServer.clearRegionBlockCache(region));
3567      } catch (Exception e) {
3568        stats.addException(region.getRegionInfo().getRegionName(), e);
3569      }
3570    }
3571    stats.withMaxCacheSize(regionServer.getCacheConfig().getBlockCache().getMaxSize());
3572    return builder.setStats(ProtobufUtil.toCacheEvictionStats(stats.build())).build();
3573  }
3574}