001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019 020import java.io.FileNotFoundException; 021import java.io.IOException; 022import java.io.InterruptedIOException; 023import java.io.UncheckedIOException; 024import java.lang.reflect.InvocationTargetException; 025import java.net.BindException; 026import java.net.InetSocketAddress; 027import java.nio.ByteBuffer; 028import java.util.ArrayList; 029import java.util.Arrays; 030import java.util.Collections; 031import java.util.HashMap; 032import java.util.Iterator; 033import java.util.List; 034import java.util.Map; 035import java.util.Map.Entry; 036import java.util.NavigableMap; 037import java.util.Optional; 038import java.util.Set; 039import java.util.TreeSet; 040import java.util.concurrent.ConcurrentHashMap; 041import java.util.concurrent.ConcurrentMap; 042import java.util.concurrent.TimeUnit; 043import java.util.concurrent.atomic.AtomicBoolean; 044import java.util.concurrent.atomic.AtomicLong; 045import java.util.concurrent.atomic.LongAdder; 046import org.apache.commons.lang3.mutable.MutableObject; 047import org.apache.hadoop.conf.Configuration; 048import org.apache.hadoop.fs.Path; 049import org.apache.hadoop.hbase.ByteBufferExtendedCell; 050import org.apache.hadoop.hbase.CacheEvictionStats; 051import org.apache.hadoop.hbase.CacheEvictionStatsBuilder; 052import org.apache.hadoop.hbase.Cell; 053import org.apache.hadoop.hbase.CellScannable; 054import org.apache.hadoop.hbase.CellScanner; 055import org.apache.hadoop.hbase.CellUtil; 056import org.apache.hadoop.hbase.CompareOperator; 057import org.apache.hadoop.hbase.DoNotRetryIOException; 058import org.apache.hadoop.hbase.DroppedSnapshotException; 059import org.apache.hadoop.hbase.HBaseIOException; 060import org.apache.hadoop.hbase.HConstants; 061import org.apache.hadoop.hbase.MultiActionResultTooLarge; 062import org.apache.hadoop.hbase.NotServingRegionException; 063import org.apache.hadoop.hbase.PrivateCellUtil; 064import org.apache.hadoop.hbase.RegionTooBusyException; 065import org.apache.hadoop.hbase.Server; 066import org.apache.hadoop.hbase.ServerName; 067import org.apache.hadoop.hbase.TableName; 068import org.apache.hadoop.hbase.UnknownScannerException; 069import org.apache.hadoop.hbase.client.Append; 070import org.apache.hadoop.hbase.client.ConnectionUtils; 071import org.apache.hadoop.hbase.client.Delete; 072import org.apache.hadoop.hbase.client.Durability; 073import org.apache.hadoop.hbase.client.Get; 074import org.apache.hadoop.hbase.client.Increment; 075import org.apache.hadoop.hbase.client.Mutation; 076import org.apache.hadoop.hbase.client.Put; 077import org.apache.hadoop.hbase.client.RegionInfo; 078import org.apache.hadoop.hbase.client.RegionReplicaUtil; 079import org.apache.hadoop.hbase.client.Result; 080import org.apache.hadoop.hbase.client.Row; 081import org.apache.hadoop.hbase.client.RowMutations; 082import org.apache.hadoop.hbase.client.Scan; 083import org.apache.hadoop.hbase.client.TableDescriptor; 084import org.apache.hadoop.hbase.client.VersionInfoUtil; 085import org.apache.hadoop.hbase.conf.ConfigurationObserver; 086import org.apache.hadoop.hbase.exceptions.FailedSanityCheckException; 087import org.apache.hadoop.hbase.exceptions.OutOfOrderScannerNextException; 088import org.apache.hadoop.hbase.exceptions.ScannerResetException; 089import org.apache.hadoop.hbase.exceptions.UnknownProtocolException; 090import org.apache.hadoop.hbase.filter.ByteArrayComparable; 091import org.apache.hadoop.hbase.filter.Filter; 092import org.apache.hadoop.hbase.io.ByteBuffAllocator; 093import org.apache.hadoop.hbase.io.TimeRange; 094import org.apache.hadoop.hbase.io.hfile.BlockCache; 095import org.apache.hadoop.hbase.ipc.HBaseRPCErrorHandler; 096import org.apache.hadoop.hbase.ipc.HBaseRpcController; 097import org.apache.hadoop.hbase.ipc.PriorityFunction; 098import org.apache.hadoop.hbase.ipc.QosPriority; 099import org.apache.hadoop.hbase.ipc.RpcCallContext; 100import org.apache.hadoop.hbase.ipc.RpcCallback; 101import org.apache.hadoop.hbase.ipc.RpcScheduler; 102import org.apache.hadoop.hbase.ipc.RpcServer; 103import org.apache.hadoop.hbase.ipc.RpcServer.BlockingServiceAndInterface; 104import org.apache.hadoop.hbase.ipc.RpcServerFactory; 105import org.apache.hadoop.hbase.ipc.RpcServerInterface; 106import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException; 107import org.apache.hadoop.hbase.ipc.ServerRpcController; 108import org.apache.hadoop.hbase.log.HBaseMarkers; 109import org.apache.hadoop.hbase.master.HMaster; 110import org.apache.hadoop.hbase.master.MasterRpcServices; 111import org.apache.hadoop.hbase.net.Address; 112import org.apache.hadoop.hbase.procedure2.RSProcedureCallable; 113import org.apache.hadoop.hbase.quotas.ActivePolicyEnforcement; 114import org.apache.hadoop.hbase.quotas.OperationQuota; 115import org.apache.hadoop.hbase.quotas.QuotaUtil; 116import org.apache.hadoop.hbase.quotas.RegionServerRpcQuotaManager; 117import org.apache.hadoop.hbase.quotas.RegionServerSpaceQuotaManager; 118import org.apache.hadoop.hbase.quotas.SpaceQuotaSnapshot; 119import org.apache.hadoop.hbase.quotas.SpaceViolationPolicyEnforcement; 120import org.apache.hadoop.hbase.regionserver.HRegion.RegionScannerImpl; 121import org.apache.hadoop.hbase.regionserver.LeaseManager.Lease; 122import org.apache.hadoop.hbase.regionserver.LeaseManager.LeaseStillHeldException; 123import org.apache.hadoop.hbase.regionserver.Region.Operation; 124import org.apache.hadoop.hbase.regionserver.ScannerContext.LimitScope; 125import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker; 126import org.apache.hadoop.hbase.regionserver.handler.AssignRegionHandler; 127import org.apache.hadoop.hbase.regionserver.handler.OpenMetaHandler; 128import org.apache.hadoop.hbase.regionserver.handler.OpenPriorityRegionHandler; 129import org.apache.hadoop.hbase.regionserver.handler.OpenRegionHandler; 130import org.apache.hadoop.hbase.regionserver.handler.UnassignRegionHandler; 131import org.apache.hadoop.hbase.regionserver.slowlog.SlowLogRecorder; 132import org.apache.hadoop.hbase.security.Superusers; 133import org.apache.hadoop.hbase.security.User; 134import org.apache.hadoop.hbase.security.access.AccessChecker; 135import org.apache.hadoop.hbase.security.access.NoopAccessChecker; 136import org.apache.hadoop.hbase.security.access.Permission; 137import org.apache.hadoop.hbase.security.access.ZKPermissionWatcher; 138import org.apache.hadoop.hbase.util.Bytes; 139import org.apache.hadoop.hbase.util.DNS; 140import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 141import org.apache.hadoop.hbase.util.Pair; 142import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil; 143import org.apache.hadoop.hbase.wal.WAL; 144import org.apache.hadoop.hbase.wal.WALEdit; 145import org.apache.hadoop.hbase.wal.WALKey; 146import org.apache.hadoop.hbase.wal.WALSplitUtil; 147import org.apache.hadoop.hbase.wal.WALSplitUtil.MutationReplay; 148import org.apache.hadoop.hbase.zookeeper.ZKWatcher; 149import org.apache.yetus.audience.InterfaceAudience; 150import org.apache.zookeeper.KeeperException; 151import org.slf4j.Logger; 152import org.slf4j.LoggerFactory; 153 154import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; 155import org.apache.hbase.thirdparty.com.google.common.cache.Cache; 156import org.apache.hbase.thirdparty.com.google.common.cache.CacheBuilder; 157import org.apache.hbase.thirdparty.com.google.common.collect.Lists; 158import org.apache.hbase.thirdparty.com.google.protobuf.ByteString; 159import org.apache.hbase.thirdparty.com.google.protobuf.Message; 160import org.apache.hbase.thirdparty.com.google.protobuf.RpcController; 161import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException; 162import org.apache.hbase.thirdparty.com.google.protobuf.TextFormat; 163import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations; 164import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils; 165 166import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 167import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter; 168import org.apache.hadoop.hbase.shaded.protobuf.ResponseConverter; 169import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService; 170import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearCompactionQueuesRequest; 171import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearCompactionQueuesResponse; 172import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearRegionBlockCacheRequest; 173import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearRegionBlockCacheResponse; 174import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearSlowLogResponseRequest; 175import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearSlowLogResponses; 176import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CloseRegionRequest; 177import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CloseRegionResponse; 178import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactRegionRequest; 179import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactRegionResponse; 180import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactionSwitchRequest; 181import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactionSwitchResponse; 182import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ExecuteProceduresRequest; 183import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ExecuteProceduresResponse; 184import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionRequest; 185import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionResponse; 186import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetOnlineRegionRequest; 187import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetOnlineRegionResponse; 188import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoRequest; 189import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoResponse; 190import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionLoadRequest; 191import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionLoadResponse; 192import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetServerInfoRequest; 193import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetServerInfoResponse; 194import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetStoreFileRequest; 195import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetStoreFileResponse; 196import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionRequest; 197import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionRequest.RegionOpenInfo; 198import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionResponse; 199import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionResponse.RegionOpeningState; 200import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.RemoteProcedureRequest; 201import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ReplicateWALEntryRequest; 202import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ReplicateWALEntryResponse; 203import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.RollWALWriterRequest; 204import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.RollWALWriterResponse; 205import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.SlowLogResponseRequest; 206import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.SlowLogResponses; 207import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.StopServerRequest; 208import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.StopServerResponse; 209import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateConfigurationRequest; 210import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateConfigurationResponse; 211import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateFavoredNodesRequest; 212import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateFavoredNodesResponse; 213import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WALEntry; 214import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WarmupRegionRequest; 215import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WarmupRegionResponse; 216import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos; 217import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.Action; 218import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.BulkLoadHFileRequest; 219import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.BulkLoadHFileRequest.FamilyPath; 220import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.BulkLoadHFileResponse; 221import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CleanupBulkLoadRequest; 222import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CleanupBulkLoadResponse; 223import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService; 224import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.Condition; 225import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceRequest; 226import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceResponse; 227import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.GetRequest; 228import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.GetResponse; 229import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MultiRegionLoadStats; 230import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MultiRequest; 231import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MultiResponse; 232import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateRequest; 233import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateResponse; 234import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutationProto; 235import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutationProto.MutationType; 236import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.PrepareBulkLoadRequest; 237import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.PrepareBulkLoadResponse; 238import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.RegionAction; 239import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.RegionActionResult; 240import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ResultOrException; 241import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanRequest; 242import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanResponse; 243import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos; 244import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos.RegionLoad; 245import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.NameBytesPair; 246import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.NameInt64Pair; 247import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier; 248import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType; 249import org.apache.hadoop.hbase.shaded.protobuf.generated.MapReduceProtos.ScanMetrics; 250import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsRequest; 251import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsResponse; 252import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsResponse.TableQuotaSnapshot; 253import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader; 254import org.apache.hadoop.hbase.shaded.protobuf.generated.TooSlowLog.SlowLogPayload; 255import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.BulkLoadDescriptor; 256import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.CompactionDescriptor; 257import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.FlushDescriptor; 258import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.RegionEventDescriptor; 259 260/** 261 * Implements the regionserver RPC services. 262 */ 263@InterfaceAudience.Private 264@SuppressWarnings("deprecation") 265public class RSRpcServices implements HBaseRPCErrorHandler, 266 AdminService.BlockingInterface, ClientService.BlockingInterface, PriorityFunction, 267 ConfigurationObserver { 268 protected static final Logger LOG = LoggerFactory.getLogger(RSRpcServices.class); 269 270 /** RPC scheduler to use for the region server. */ 271 public static final String REGION_SERVER_RPC_SCHEDULER_FACTORY_CLASS = 272 "hbase.region.server.rpc.scheduler.factory.class"; 273 274 /** RPC scheduler to use for the master. */ 275 public static final String MASTER_RPC_SCHEDULER_FACTORY_CLASS = 276 "hbase.master.rpc.scheduler.factory.class"; 277 278 /** 279 * Minimum allowable time limit delta (in milliseconds) that can be enforced during scans. This 280 * configuration exists to prevent the scenario where a time limit is specified to be so 281 * restrictive that the time limit is reached immediately (before any cells are scanned). 282 */ 283 private static final String REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA = 284 "hbase.region.server.rpc.minimum.scan.time.limit.delta"; 285 /** 286 * Default value of {@link RSRpcServices#REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA} 287 */ 288 private static final long DEFAULT_REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA = 10; 289 290 /** 291 * Number of rows in a batch operation above which a warning will be logged. 292 */ 293 static final String BATCH_ROWS_THRESHOLD_NAME = "hbase.rpc.rows.warning.threshold"; 294 /** 295 * Default value of {@link RSRpcServices#BATCH_ROWS_THRESHOLD_NAME} 296 */ 297 static final int BATCH_ROWS_THRESHOLD_DEFAULT = 5000; 298 299 /* 300 * Whether to reject rows with size > threshold defined by 301 * {@link RSRpcServices#BATCH_ROWS_THRESHOLD_NAME} 302 */ 303 private static final String REJECT_BATCH_ROWS_OVER_THRESHOLD = 304 "hbase.rpc.rows.size.threshold.reject"; 305 306 /* 307 * Default value of config {@link RSRpcServices#REJECT_BATCH_ROWS_OVER_THRESHOLD} 308 */ 309 private static final boolean DEFAULT_REJECT_BATCH_ROWS_OVER_THRESHOLD = false; 310 311 // Request counter. (Includes requests that are not serviced by regions.) 312 // Count only once for requests with multiple actions like multi/caching-scan/replayBatch 313 final LongAdder requestCount = new LongAdder(); 314 315 // Request counter for rpc get 316 final LongAdder rpcGetRequestCount = new LongAdder(); 317 318 // Request counter for rpc scan 319 final LongAdder rpcScanRequestCount = new LongAdder(); 320 321 // Request counter for rpc multi 322 final LongAdder rpcMultiRequestCount = new LongAdder(); 323 324 // Request counter for rpc mutate 325 final LongAdder rpcMutateRequestCount = new LongAdder(); 326 327 // Server to handle client requests. 328 final RpcServerInterface rpcServer; 329 final InetSocketAddress isa; 330 331 @VisibleForTesting 332 protected final HRegionServer regionServer; 333 private final long maxScannerResultSize; 334 335 // The reference to the priority extraction function 336 private final PriorityFunction priority; 337 338 private ScannerIdGenerator scannerIdGenerator; 339 private final ConcurrentMap<String, RegionScannerHolder> scanners = new ConcurrentHashMap<>(); 340 // Hold the name of a closed scanner for a while. This is used to keep compatible for old clients 341 // which may send next or close request to a region scanner which has already been exhausted. The 342 // entries will be removed automatically after scannerLeaseTimeoutPeriod. 343 private final Cache<String, String> closedScanners; 344 /** 345 * The lease timeout period for client scanners (milliseconds). 346 */ 347 private final int scannerLeaseTimeoutPeriod; 348 349 /** 350 * The RPC timeout period (milliseconds) 351 */ 352 private final int rpcTimeout; 353 354 /** 355 * The minimum allowable delta to use for the scan limit 356 */ 357 private final long minimumScanTimeLimitDelta; 358 359 /** 360 * Row size threshold for multi requests above which a warning is logged 361 */ 362 private final int rowSizeWarnThreshold; 363 /* 364 * Whether we should reject requests with very high no of rows i.e. beyond threshold 365 * defined by rowSizeWarnThreshold 366 */ 367 private final boolean rejectRowsWithSizeOverThreshold; 368 369 final AtomicBoolean clearCompactionQueues = new AtomicBoolean(false); 370 371 private AccessChecker accessChecker; 372 private ZKPermissionWatcher zkPermissionWatcher; 373 374 /** 375 * Services launched in RSRpcServices. By default they are on but you can use the below 376 * booleans to selectively enable/disable either Admin or Client Service (Rare is the case 377 * where you would ever turn off one or the other). 378 */ 379 public static final String REGIONSERVER_ADMIN_SERVICE_CONFIG = 380 "hbase.regionserver.admin.executorService"; 381 public static final String REGIONSERVER_CLIENT_SERVICE_CONFIG = 382 "hbase.regionserver.client.executorService"; 383 384 /** 385 * An Rpc callback for closing a RegionScanner. 386 */ 387 private static final class RegionScannerCloseCallBack implements RpcCallback { 388 389 private final RegionScanner scanner; 390 391 public RegionScannerCloseCallBack(RegionScanner scanner) { 392 this.scanner = scanner; 393 } 394 395 @Override 396 public void run() throws IOException { 397 this.scanner.close(); 398 } 399 } 400 401 /** 402 * An Rpc callback for doing shipped() call on a RegionScanner. 403 */ 404 private class RegionScannerShippedCallBack implements RpcCallback { 405 406 private final String scannerName; 407 private final Shipper shipper; 408 private final Lease lease; 409 410 public RegionScannerShippedCallBack(String scannerName, Shipper shipper, Lease lease) { 411 this.scannerName = scannerName; 412 this.shipper = shipper; 413 this.lease = lease; 414 } 415 416 @Override 417 public void run() throws IOException { 418 this.shipper.shipped(); 419 // We're done. On way out re-add the above removed lease. The lease was temp removed for this 420 // Rpc call and we are at end of the call now. Time to add it back. 421 if (scanners.containsKey(scannerName)) { 422 if (lease != null) { 423 regionServer.getLeaseManager().addLease(lease); 424 } 425 } 426 } 427 } 428 429 /** 430 * An RpcCallBack that creates a list of scanners that needs to perform callBack operation on 431 * completion of multiGets. 432 */ 433 static class RegionScannersCloseCallBack implements RpcCallback { 434 private final List<RegionScanner> scanners = new ArrayList<>(); 435 436 public void addScanner(RegionScanner scanner) { 437 this.scanners.add(scanner); 438 } 439 440 @Override 441 public void run() { 442 for (RegionScanner scanner : scanners) { 443 try { 444 scanner.close(); 445 } catch (IOException e) { 446 LOG.error("Exception while closing the scanner " + scanner, e); 447 } 448 } 449 } 450 } 451 452 /** 453 * Holder class which holds the RegionScanner, nextCallSeq and RpcCallbacks together. 454 */ 455 private static final class RegionScannerHolder { 456 457 private final AtomicLong nextCallSeq = new AtomicLong(0); 458 private final String scannerName; 459 private final RegionScanner s; 460 private final HRegion r; 461 private final RpcCallback closeCallBack; 462 private final RpcCallback shippedCallback; 463 private byte[] rowOfLastPartialResult; 464 private boolean needCursor; 465 466 public RegionScannerHolder(String scannerName, RegionScanner s, HRegion r, 467 RpcCallback closeCallBack, RpcCallback shippedCallback, boolean needCursor) { 468 this.scannerName = scannerName; 469 this.s = s; 470 this.r = r; 471 this.closeCallBack = closeCallBack; 472 this.shippedCallback = shippedCallback; 473 this.needCursor = needCursor; 474 } 475 476 public long getNextCallSeq() { 477 return nextCallSeq.get(); 478 } 479 480 public boolean incNextCallSeq(long currentSeq) { 481 // Use CAS to prevent multiple scan request running on the same scanner. 482 return nextCallSeq.compareAndSet(currentSeq, currentSeq + 1); 483 } 484 } 485 486 /** 487 * Instantiated as a scanner lease. If the lease times out, the scanner is 488 * closed 489 */ 490 private class ScannerListener implements LeaseListener { 491 private final String scannerName; 492 493 ScannerListener(final String n) { 494 this.scannerName = n; 495 } 496 497 @Override 498 public void leaseExpired() { 499 RegionScannerHolder rsh = scanners.remove(this.scannerName); 500 if (rsh != null) { 501 RegionScanner s = rsh.s; 502 LOG.info("Scanner " + this.scannerName + " lease expired on region " 503 + s.getRegionInfo().getRegionNameAsString()); 504 HRegion region = null; 505 try { 506 region = regionServer.getRegion(s.getRegionInfo().getRegionName()); 507 if (region != null && region.getCoprocessorHost() != null) { 508 region.getCoprocessorHost().preScannerClose(s); 509 } 510 } catch (IOException e) { 511 LOG.error("Closing scanner for " + s.getRegionInfo().getRegionNameAsString(), e); 512 } finally { 513 try { 514 s.close(); 515 if (region != null && region.getCoprocessorHost() != null) { 516 region.getCoprocessorHost().postScannerClose(s); 517 } 518 } catch (IOException e) { 519 LOG.error("Closing scanner for " + s.getRegionInfo().getRegionNameAsString(), e); 520 } 521 } 522 } else { 523 LOG.warn("Scanner " + this.scannerName + " lease expired, but no related" + 524 " scanner found, hence no chance to close that related scanner!"); 525 } 526 } 527 } 528 529 private static ResultOrException getResultOrException(final ClientProtos.Result r, 530 final int index){ 531 return getResultOrException(ResponseConverter.buildActionResult(r), index); 532 } 533 534 private static ResultOrException getResultOrException(final Exception e, final int index) { 535 return getResultOrException(ResponseConverter.buildActionResult(e), index); 536 } 537 538 private static ResultOrException getResultOrException( 539 final ResultOrException.Builder builder, final int index) { 540 return builder.setIndex(index).build(); 541 } 542 543 /** 544 * Checks for the following pre-checks in order: 545 * <ol> 546 * <li>RegionServer is running</li> 547 * <li>If authorization is enabled, then RPC caller has ADMIN permissions</li> 548 * </ol> 549 * @param requestName name of rpc request. Used in reporting failures to provide context. 550 * @throws ServiceException If any of the above listed pre-check fails. 551 */ 552 private void rpcPreCheck(String requestName) throws ServiceException { 553 try { 554 checkOpen(); 555 requirePermission(requestName, Permission.Action.ADMIN); 556 } catch (IOException ioe) { 557 throw new ServiceException(ioe); 558 } 559 } 560 561 /** 562 * Starts the nonce operation for a mutation, if needed. 563 * @param mutation Mutation. 564 * @param nonceGroup Nonce group from the request. 565 * @return whether to proceed this mutation. 566 */ 567 private boolean startNonceOperation(final MutationProto mutation, long nonceGroup) 568 throws IOException { 569 if (regionServer.nonceManager == null || !mutation.hasNonce()) return true; 570 boolean canProceed = false; 571 try { 572 canProceed = regionServer.nonceManager.startOperation( 573 nonceGroup, mutation.getNonce(), regionServer); 574 } catch (InterruptedException ex) { 575 throw new InterruptedIOException("Nonce start operation interrupted"); 576 } 577 return canProceed; 578 } 579 580 /** 581 * Ends nonce operation for a mutation, if needed. 582 * @param mutation Mutation. 583 * @param nonceGroup Nonce group from the request. Always 0 in initial implementation. 584 * @param success Whether the operation for this nonce has succeeded. 585 */ 586 private void endNonceOperation(final MutationProto mutation, 587 long nonceGroup, boolean success) { 588 if (regionServer.nonceManager != null && mutation.hasNonce()) { 589 regionServer.nonceManager.endOperation(nonceGroup, mutation.getNonce(), success); 590 } 591 } 592 593 private boolean isClientCellBlockSupport(RpcCallContext context) { 594 return context != null && context.isClientCellBlockSupported(); 595 } 596 597 private void addResult(final MutateResponse.Builder builder, final Result result, 598 final HBaseRpcController rpcc, boolean clientCellBlockSupported) { 599 if (result == null) return; 600 if (clientCellBlockSupported) { 601 builder.setResult(ProtobufUtil.toResultNoData(result)); 602 rpcc.setCellScanner(result.cellScanner()); 603 } else { 604 ClientProtos.Result pbr = ProtobufUtil.toResult(result); 605 builder.setResult(pbr); 606 } 607 } 608 609 private void addResults(ScanResponse.Builder builder, List<Result> results, 610 HBaseRpcController controller, boolean isDefaultRegion, boolean clientCellBlockSupported) { 611 builder.setStale(!isDefaultRegion); 612 if (results.isEmpty()) { 613 return; 614 } 615 if (clientCellBlockSupported) { 616 for (Result res : results) { 617 builder.addCellsPerResult(res.size()); 618 builder.addPartialFlagPerResult(res.mayHaveMoreCellsInRow()); 619 } 620 controller.setCellScanner(CellUtil.createCellScanner(results)); 621 } else { 622 for (Result res : results) { 623 ClientProtos.Result pbr = ProtobufUtil.toResult(res); 624 builder.addResults(pbr); 625 } 626 } 627 } 628 629 /** 630 * Mutate a list of rows atomically. 631 * @param cellScanner if non-null, the mutation data -- the Cell content. 632 */ 633 private boolean checkAndRowMutate(final HRegion region, final List<ClientProtos.Action> actions, 634 final CellScanner cellScanner, byte[] row, byte[] family, byte[] qualifier, 635 CompareOperator op, ByteArrayComparable comparator, Filter filter, TimeRange timeRange, 636 RegionActionResult.Builder builder, ActivePolicyEnforcement spaceQuotaEnforcement) 637 throws IOException { 638 int countOfCompleteMutation = 0; 639 try { 640 if (!region.getRegionInfo().isMetaRegion()) { 641 regionServer.getMemStoreFlusher().reclaimMemStoreMemory(); 642 } 643 RowMutations rm = null; 644 int i = 0; 645 ClientProtos.ResultOrException.Builder resultOrExceptionOrBuilder = 646 ClientProtos.ResultOrException.newBuilder(); 647 for (ClientProtos.Action action: actions) { 648 if (action.hasGet()) { 649 throw new DoNotRetryIOException("Atomic put and/or delete only, not a Get=" + 650 action.getGet()); 651 } 652 MutationType type = action.getMutation().getMutateType(); 653 if (rm == null) { 654 rm = new RowMutations(action.getMutation().getRow().toByteArray(), actions.size()); 655 } 656 switch (type) { 657 case PUT: 658 Put put = ProtobufUtil.toPut(action.getMutation(), cellScanner); 659 ++countOfCompleteMutation; 660 checkCellSizeLimit(region, put); 661 spaceQuotaEnforcement.getPolicyEnforcement(region).check(put); 662 rm.add(put); 663 break; 664 case DELETE: 665 Delete del = ProtobufUtil.toDelete(action.getMutation(), cellScanner); 666 ++countOfCompleteMutation; 667 spaceQuotaEnforcement.getPolicyEnforcement(region).check(del); 668 rm.add(del); 669 break; 670 default: 671 throw new DoNotRetryIOException("Atomic put and/or delete only, not " + type.name()); 672 } 673 // To unify the response format with doNonAtomicRegionMutation and read through client's 674 // AsyncProcess we have to add an empty result instance per operation 675 resultOrExceptionOrBuilder.clear(); 676 resultOrExceptionOrBuilder.setIndex(i++); 677 builder.addResultOrException( 678 resultOrExceptionOrBuilder.build()); 679 } 680 681 if (filter != null) { 682 return region.checkAndRowMutate(row, filter, timeRange, rm); 683 } else { 684 return region.checkAndRowMutate(row, family, qualifier, op, comparator, timeRange, rm); 685 } 686 } finally { 687 // Currently, the checkAndMutate isn't supported by batch so it won't mess up the cell scanner 688 // even if the malformed cells are not skipped. 689 for (int i = countOfCompleteMutation; i < actions.size(); ++i) { 690 skipCellsForMutation(actions.get(i), cellScanner); 691 } 692 } 693 } 694 695 /** 696 * Execute an append mutation. 697 * 698 * @return result to return to client if default operation should be 699 * bypassed as indicated by RegionObserver, null otherwise 700 */ 701 private Result append(final HRegion region, final OperationQuota quota, 702 final MutationProto mutation, final CellScanner cellScanner, long nonceGroup, 703 ActivePolicyEnforcement spaceQuota) 704 throws IOException { 705 long before = EnvironmentEdgeManager.currentTime(); 706 Append append = ProtobufUtil.toAppend(mutation, cellScanner); 707 checkCellSizeLimit(region, append); 708 spaceQuota.getPolicyEnforcement(region).check(append); 709 quota.addMutation(append); 710 Result r = null; 711 if (region.getCoprocessorHost() != null) { 712 r = region.getCoprocessorHost().preAppend(append); 713 } 714 if (r == null) { 715 boolean canProceed = startNonceOperation(mutation, nonceGroup); 716 boolean success = false; 717 try { 718 long nonce = mutation.hasNonce() ? mutation.getNonce() : HConstants.NO_NONCE; 719 if (canProceed) { 720 r = region.append(append, nonceGroup, nonce); 721 } else { 722 // convert duplicate append to get 723 List<Cell> results = region.get(toGet(append), false, nonceGroup, nonce); 724 r = Result.create(results); 725 } 726 success = true; 727 } finally { 728 if (canProceed) { 729 endNonceOperation(mutation, nonceGroup, success); 730 } 731 } 732 if (region.getCoprocessorHost() != null) { 733 r = region.getCoprocessorHost().postAppend(append, r); 734 } 735 } 736 if (regionServer.getMetrics() != null) { 737 regionServer.getMetrics().updateAppend( 738 region.getTableDescriptor().getTableName(), 739 EnvironmentEdgeManager.currentTime() - before); 740 } 741 return r == null ? Result.EMPTY_RESULT : r; 742 } 743 744 /** 745 * Execute an increment mutation. 746 */ 747 private Result increment(final HRegion region, final OperationQuota quota, 748 final MutationProto mutation, final CellScanner cells, long nonceGroup, 749 ActivePolicyEnforcement spaceQuota) 750 throws IOException { 751 long before = EnvironmentEdgeManager.currentTime(); 752 Increment increment = ProtobufUtil.toIncrement(mutation, cells); 753 checkCellSizeLimit(region, increment); 754 spaceQuota.getPolicyEnforcement(region).check(increment); 755 quota.addMutation(increment); 756 Result r = null; 757 if (region.getCoprocessorHost() != null) { 758 r = region.getCoprocessorHost().preIncrement(increment); 759 } 760 if (r == null) { 761 boolean canProceed = startNonceOperation(mutation, nonceGroup); 762 boolean success = false; 763 try { 764 long nonce = mutation.hasNonce() ? mutation.getNonce() : HConstants.NO_NONCE; 765 if (canProceed) { 766 r = region.increment(increment, nonceGroup, nonce); 767 } else { 768 // convert duplicate increment to get 769 List<Cell> results = region.get(toGet(increment), false, nonceGroup, nonce); 770 r = Result.create(results); 771 } 772 success = true; 773 } finally { 774 if (canProceed) { 775 endNonceOperation(mutation, nonceGroup, success); 776 } 777 } 778 if (region.getCoprocessorHost() != null) { 779 r = region.getCoprocessorHost().postIncrement(increment, r); 780 } 781 } 782 final MetricsRegionServer metricsRegionServer = regionServer.getMetrics(); 783 if (metricsRegionServer != null) { 784 metricsRegionServer.updateIncrement( 785 region.getTableDescriptor().getTableName(), 786 EnvironmentEdgeManager.currentTime() - before); 787 } 788 return r == null ? Result.EMPTY_RESULT : r; 789 } 790 791 private static Get toGet(final Mutation mutation) throws IOException { 792 if(!(mutation instanceof Increment) && !(mutation instanceof Append)) { 793 throw new AssertionError("mutation must be a instance of Increment or Append"); 794 } 795 Get get = new Get(mutation.getRow()); 796 CellScanner cellScanner = mutation.cellScanner(); 797 while (!cellScanner.advance()) { 798 Cell cell = cellScanner.current(); 799 get.addColumn(CellUtil.cloneFamily(cell), CellUtil.cloneQualifier(cell)); 800 } 801 if (mutation instanceof Increment) { 802 // Increment 803 Increment increment = (Increment) mutation; 804 get.setTimeRange(increment.getTimeRange().getMin(), increment.getTimeRange().getMax()); 805 } else { 806 // Append 807 Append append = (Append) mutation; 808 get.setTimeRange(append.getTimeRange().getMin(), append.getTimeRange().getMax()); 809 } 810 for (Entry<String, byte[]> entry : mutation.getAttributesMap().entrySet()) { 811 get.setAttribute(entry.getKey(), entry.getValue()); 812 } 813 return get; 814 } 815 816 /** 817 * Run through the regionMutation <code>rm</code> and per Mutation, do the work, and then when 818 * done, add an instance of a {@link ResultOrException} that corresponds to each Mutation. 819 * @param cellsToReturn Could be null. May be allocated in this method. This is what this 820 * method returns as a 'result'. 821 * @param closeCallBack the callback to be used with multigets 822 * @param context the current RpcCallContext 823 * @return Return the <code>cellScanner</code> passed 824 */ 825 private List<CellScannable> doNonAtomicRegionMutation(final HRegion region, 826 final OperationQuota quota, final RegionAction actions, final CellScanner cellScanner, 827 final RegionActionResult.Builder builder, List<CellScannable> cellsToReturn, long nonceGroup, 828 final RegionScannersCloseCallBack closeCallBack, RpcCallContext context, 829 ActivePolicyEnforcement spaceQuotaEnforcement) { 830 // Gather up CONTIGUOUS Puts and Deletes in this mutations List. Idea is that rather than do 831 // one at a time, we instead pass them in batch. Be aware that the corresponding 832 // ResultOrException instance that matches each Put or Delete is then added down in the 833 // doNonAtomicBatchOp call. We should be staying aligned though the Put and Delete are 834 // deferred/batched 835 List<ClientProtos.Action> mutations = null; 836 long maxQuotaResultSize = Math.min(maxScannerResultSize, quota.getReadAvailable()); 837 IOException sizeIOE = null; 838 Object lastBlock = null; 839 ClientProtos.ResultOrException.Builder resultOrExceptionBuilder = ResultOrException.newBuilder(); 840 boolean hasResultOrException = false; 841 for (ClientProtos.Action action : actions.getActionList()) { 842 hasResultOrException = false; 843 resultOrExceptionBuilder.clear(); 844 try { 845 Result r = null; 846 847 if (context != null 848 && context.isRetryImmediatelySupported() 849 && (context.getResponseCellSize() > maxQuotaResultSize 850 || context.getResponseBlockSize() + context.getResponseExceptionSize() 851 > maxQuotaResultSize)) { 852 853 // We're storing the exception since the exception and reason string won't 854 // change after the response size limit is reached. 855 if (sizeIOE == null ) { 856 // We don't need the stack un-winding do don't throw the exception. 857 // Throwing will kill the JVM's JIT. 858 // 859 // Instead just create the exception and then store it. 860 sizeIOE = new MultiActionResultTooLarge("Max size exceeded" 861 + " CellSize: " + context.getResponseCellSize() 862 + " BlockSize: " + context.getResponseBlockSize()); 863 864 // Only report the exception once since there's only one request that 865 // caused the exception. Otherwise this number will dominate the exceptions count. 866 rpcServer.getMetrics().exception(sizeIOE); 867 } 868 869 // Now that there's an exception is known to be created 870 // use it for the response. 871 // 872 // This will create a copy in the builder. 873 NameBytesPair pair = ResponseConverter.buildException(sizeIOE); 874 resultOrExceptionBuilder.setException(pair); 875 context.incrementResponseExceptionSize(pair.getSerializedSize()); 876 resultOrExceptionBuilder.setIndex(action.getIndex()); 877 builder.addResultOrException(resultOrExceptionBuilder.build()); 878 skipCellsForMutation(action, cellScanner); 879 continue; 880 } 881 if (action.hasGet()) { 882 long before = EnvironmentEdgeManager.currentTime(); 883 ClientProtos.Get pbGet = action.getGet(); 884 // An asynchbase client, https://github.com/OpenTSDB/asynchbase, starts by trying to do 885 // a get closest before. Throwing the UnknownProtocolException signals it that it needs 886 // to switch and do hbase2 protocol (HBase servers do not tell clients what versions 887 // they are; its a problem for non-native clients like asynchbase. HBASE-20225. 888 if (pbGet.hasClosestRowBefore() && pbGet.getClosestRowBefore()) { 889 throw new UnknownProtocolException("Is this a pre-hbase-1.0.0 or asynchbase client? " + 890 "Client is invoking getClosestRowBefore removed in hbase-2.0.0 replaced by " + 891 "reverse Scan."); 892 } 893 try { 894 Get get = ProtobufUtil.toGet(pbGet); 895 if (context != null) { 896 r = get(get, (region), closeCallBack, context); 897 } else { 898 r = region.get(get); 899 } 900 } finally { 901 final MetricsRegionServer metricsRegionServer = regionServer.getMetrics(); 902 if (metricsRegionServer != null) { 903 metricsRegionServer.updateGet( 904 region.getTableDescriptor().getTableName(), 905 EnvironmentEdgeManager.currentTime() - before); 906 } 907 } 908 } else if (action.hasServiceCall()) { 909 hasResultOrException = true; 910 com.google.protobuf.Message result = 911 execServiceOnRegion(region, action.getServiceCall()); 912 ClientProtos.CoprocessorServiceResult.Builder serviceResultBuilder = 913 ClientProtos.CoprocessorServiceResult.newBuilder(); 914 resultOrExceptionBuilder.setServiceResult( 915 serviceResultBuilder.setValue( 916 serviceResultBuilder.getValueBuilder() 917 .setName(result.getClass().getName()) 918 // TODO: Copy!!! 919 .setValue(UnsafeByteOperations.unsafeWrap(result.toByteArray())))); 920 } else if (action.hasMutation()) { 921 MutationType type = action.getMutation().getMutateType(); 922 if (type != MutationType.PUT && type != MutationType.DELETE && mutations != null && 923 !mutations.isEmpty()) { 924 // Flush out any Puts or Deletes already collected. 925 doNonAtomicBatchOp(builder, region, quota, mutations, cellScanner, 926 spaceQuotaEnforcement); 927 mutations.clear(); 928 } 929 switch (type) { 930 case APPEND: 931 r = append(region, quota, action.getMutation(), cellScanner, nonceGroup, 932 spaceQuotaEnforcement); 933 break; 934 case INCREMENT: 935 r = increment(region, quota, action.getMutation(), cellScanner, nonceGroup, 936 spaceQuotaEnforcement); 937 break; 938 case PUT: 939 case DELETE: 940 // Collect the individual mutations and apply in a batch 941 if (mutations == null) { 942 mutations = new ArrayList<>(actions.getActionCount()); 943 } 944 mutations.add(action); 945 break; 946 default: 947 throw new DoNotRetryIOException("Unsupported mutate type: " + type.name()); 948 } 949 } else { 950 throw new HBaseIOException("Unexpected Action type"); 951 } 952 if (r != null) { 953 ClientProtos.Result pbResult = null; 954 if (isClientCellBlockSupport(context)) { 955 pbResult = ProtobufUtil.toResultNoData(r); 956 // Hard to guess the size here. Just make a rough guess. 957 if (cellsToReturn == null) { 958 cellsToReturn = new ArrayList<>(); 959 } 960 cellsToReturn.add(r); 961 } else { 962 pbResult = ProtobufUtil.toResult(r); 963 } 964 lastBlock = addSize(context, r, lastBlock); 965 hasResultOrException = true; 966 resultOrExceptionBuilder.setResult(pbResult); 967 } 968 // Could get to here and there was no result and no exception. Presumes we added 969 // a Put or Delete to the collecting Mutations List for adding later. In this 970 // case the corresponding ResultOrException instance for the Put or Delete will be added 971 // down in the doNonAtomicBatchOp method call rather than up here. 972 } catch (IOException ie) { 973 rpcServer.getMetrics().exception(ie); 974 hasResultOrException = true; 975 NameBytesPair pair = ResponseConverter.buildException(ie); 976 resultOrExceptionBuilder.setException(pair); 977 context.incrementResponseExceptionSize(pair.getSerializedSize()); 978 } 979 if (hasResultOrException) { 980 // Propagate index. 981 resultOrExceptionBuilder.setIndex(action.getIndex()); 982 builder.addResultOrException(resultOrExceptionBuilder.build()); 983 } 984 } 985 // Finish up any outstanding mutations 986 if (!CollectionUtils.isEmpty(mutations)) { 987 doNonAtomicBatchOp(builder, region, quota, mutations, cellScanner, spaceQuotaEnforcement); 988 } 989 return cellsToReturn; 990 } 991 992 private void checkCellSizeLimit(final HRegion r, final Mutation m) throws IOException { 993 if (r.maxCellSize > 0) { 994 CellScanner cells = m.cellScanner(); 995 while (cells.advance()) { 996 int size = PrivateCellUtil.estimatedSerializedSizeOf(cells.current()); 997 if (size > r.maxCellSize) { 998 String msg = "Cell with size " + size + " exceeds limit of " + r.maxCellSize + " bytes"; 999 LOG.debug(msg); 1000 throw new DoNotRetryIOException(msg); 1001 } 1002 } 1003 } 1004 } 1005 1006 private void doAtomicBatchOp(final RegionActionResult.Builder builder, final HRegion region, 1007 final OperationQuota quota, final List<ClientProtos.Action> mutations, 1008 final CellScanner cells, ActivePolicyEnforcement spaceQuotaEnforcement) 1009 throws IOException { 1010 // Just throw the exception. The exception will be caught and then added to region-level 1011 // exception for RegionAction. Leaving the null to action result is ok since the null 1012 // result is viewed as failure by hbase client. And the region-lever exception will be used 1013 // to replaced the null result. see AsyncRequestFutureImpl#receiveMultiAction and 1014 // AsyncBatchRpcRetryingCaller#onComplete for more details. 1015 doBatchOp(builder, region, quota, mutations, cells, spaceQuotaEnforcement, true); 1016 } 1017 1018 private void doNonAtomicBatchOp(final RegionActionResult.Builder builder, final HRegion region, 1019 final OperationQuota quota, final List<ClientProtos.Action> mutations, 1020 final CellScanner cells, ActivePolicyEnforcement spaceQuotaEnforcement) { 1021 try { 1022 doBatchOp(builder, region, quota, mutations, cells, spaceQuotaEnforcement, false); 1023 } catch (IOException e) { 1024 // Set the exception for each action. The mutations in same RegionAction are group to 1025 // different batch and then be processed individually. Hence, we don't set the region-level 1026 // exception here for whole RegionAction. 1027 for (Action mutation : mutations) { 1028 builder.addResultOrException(getResultOrException(e, mutation.getIndex())); 1029 } 1030 } 1031 } 1032 1033 /** 1034 * Execute a list of Put/Delete mutations. 1035 * 1036 * @param builder 1037 * @param region 1038 * @param mutations 1039 */ 1040 private void doBatchOp(final RegionActionResult.Builder builder, final HRegion region, 1041 final OperationQuota quota, final List<ClientProtos.Action> mutations, 1042 final CellScanner cells, ActivePolicyEnforcement spaceQuotaEnforcement, boolean atomic) 1043 throws IOException { 1044 Mutation[] mArray = new Mutation[mutations.size()]; 1045 long before = EnvironmentEdgeManager.currentTime(); 1046 boolean batchContainsPuts = false, batchContainsDelete = false; 1047 try { 1048 /** HBASE-17924 1049 * mutationActionMap is a map to map the relation between mutations and actions 1050 * since mutation array may have been reoredered.In order to return the right 1051 * result or exception to the corresponding actions, We need to know which action 1052 * is the mutation belong to. We can't sort ClientProtos.Action array, since they 1053 * are bonded to cellscanners. 1054 */ 1055 Map<Mutation, ClientProtos.Action> mutationActionMap = new HashMap<>(); 1056 int i = 0; 1057 for (ClientProtos.Action action: mutations) { 1058 if (action.hasGet()) { 1059 throw new DoNotRetryIOException("Atomic put and/or delete only, not a Get=" + 1060 action.getGet()); 1061 } 1062 MutationProto m = action.getMutation(); 1063 Mutation mutation; 1064 if (m.getMutateType() == MutationType.PUT) { 1065 mutation = ProtobufUtil.toPut(m, cells); 1066 batchContainsPuts = true; 1067 } else { 1068 mutation = ProtobufUtil.toDelete(m, cells); 1069 batchContainsDelete = true; 1070 } 1071 mutationActionMap.put(mutation, action); 1072 mArray[i++] = mutation; 1073 checkCellSizeLimit(region, mutation); 1074 // Check if a space quota disallows this mutation 1075 spaceQuotaEnforcement.getPolicyEnforcement(region).check(mutation); 1076 quota.addMutation(mutation); 1077 } 1078 1079 if (!region.getRegionInfo().isMetaRegion()) { 1080 regionServer.getMemStoreFlusher().reclaimMemStoreMemory(); 1081 } 1082 1083 // HBASE-17924 1084 // Sort to improve lock efficiency for non-atomic batch of operations. If atomic 1085 // order is preserved as its expected from the client 1086 if (!atomic) { 1087 Arrays.sort(mArray, (v1, v2) -> Row.COMPARATOR.compare(v1, v2)); 1088 } 1089 1090 OperationStatus[] codes = region.batchMutate(mArray, atomic, HConstants.NO_NONCE, 1091 HConstants.NO_NONCE); 1092 for (i = 0; i < codes.length; i++) { 1093 Mutation currentMutation = mArray[i]; 1094 ClientProtos.Action currentAction = mutationActionMap.get(currentMutation); 1095 int index = currentAction.hasIndex() || !atomic ? currentAction.getIndex() : i; 1096 Exception e = null; 1097 switch (codes[i].getOperationStatusCode()) { 1098 case BAD_FAMILY: 1099 e = new NoSuchColumnFamilyException(codes[i].getExceptionMsg()); 1100 builder.addResultOrException(getResultOrException(e, index)); 1101 break; 1102 1103 case SANITY_CHECK_FAILURE: 1104 e = new FailedSanityCheckException(codes[i].getExceptionMsg()); 1105 builder.addResultOrException(getResultOrException(e, index)); 1106 break; 1107 1108 default: 1109 e = new DoNotRetryIOException(codes[i].getExceptionMsg()); 1110 builder.addResultOrException(getResultOrException(e, index)); 1111 break; 1112 1113 case SUCCESS: 1114 builder.addResultOrException(getResultOrException( 1115 ClientProtos.Result.getDefaultInstance(), index)); 1116 break; 1117 1118 case STORE_TOO_BUSY: 1119 e = new RegionTooBusyException(codes[i].getExceptionMsg()); 1120 builder.addResultOrException(getResultOrException(e, index)); 1121 break; 1122 } 1123 } 1124 } finally { 1125 int processedMutationIndex = 0; 1126 for (Action mutation : mutations) { 1127 // The non-null mArray[i] means the cell scanner has been read. 1128 if (mArray[processedMutationIndex++] == null) { 1129 skipCellsForMutation(mutation, cells); 1130 } 1131 } 1132 updateMutationMetrics(region, before, batchContainsPuts, batchContainsDelete); 1133 } 1134 } 1135 1136 private void updateMutationMetrics(HRegion region, long starttime, boolean batchContainsPuts, 1137 boolean batchContainsDelete) { 1138 final MetricsRegionServer metricsRegionServer = regionServer.getMetrics(); 1139 if (metricsRegionServer != null) { 1140 long after = EnvironmentEdgeManager.currentTime(); 1141 if (batchContainsPuts) { 1142 metricsRegionServer.updatePutBatch( 1143 region.getTableDescriptor().getTableName(), after - starttime); 1144 } 1145 if (batchContainsDelete) { 1146 metricsRegionServer.updateDeleteBatch( 1147 region.getTableDescriptor().getTableName(), after - starttime); 1148 } 1149 } 1150 } 1151 1152 /** 1153 * Execute a list of Put/Delete mutations. The function returns OperationStatus instead of 1154 * constructing MultiResponse to save a possible loop if caller doesn't need MultiResponse. 1155 * @param region 1156 * @param mutations 1157 * @param replaySeqId 1158 * @return an array of OperationStatus which internally contains the OperationStatusCode and the 1159 * exceptionMessage if any 1160 * @throws IOException 1161 */ 1162 private OperationStatus [] doReplayBatchOp(final HRegion region, 1163 final List<MutationReplay> mutations, long replaySeqId) throws IOException { 1164 long before = EnvironmentEdgeManager.currentTime(); 1165 boolean batchContainsPuts = false, batchContainsDelete = false; 1166 try { 1167 for (Iterator<MutationReplay> it = mutations.iterator(); it.hasNext();) { 1168 MutationReplay m = it.next(); 1169 1170 if (m.getType() == MutationType.PUT) { 1171 batchContainsPuts = true; 1172 } else { 1173 batchContainsDelete = true; 1174 } 1175 1176 NavigableMap<byte[], List<Cell>> map = m.mutation.getFamilyCellMap(); 1177 List<Cell> metaCells = map.get(WALEdit.METAFAMILY); 1178 if (metaCells != null && !metaCells.isEmpty()) { 1179 for (Cell metaCell : metaCells) { 1180 CompactionDescriptor compactionDesc = WALEdit.getCompaction(metaCell); 1181 boolean isDefaultReplica = RegionReplicaUtil.isDefaultReplica(region.getRegionInfo()); 1182 HRegion hRegion = region; 1183 if (compactionDesc != null) { 1184 // replay the compaction. Remove the files from stores only if we are the primary 1185 // region replica (thus own the files) 1186 hRegion.replayWALCompactionMarker(compactionDesc, !isDefaultReplica, isDefaultReplica, 1187 replaySeqId); 1188 continue; 1189 } 1190 FlushDescriptor flushDesc = WALEdit.getFlushDescriptor(metaCell); 1191 if (flushDesc != null && !isDefaultReplica) { 1192 hRegion.replayWALFlushMarker(flushDesc, replaySeqId); 1193 continue; 1194 } 1195 RegionEventDescriptor regionEvent = WALEdit.getRegionEventDescriptor(metaCell); 1196 if (regionEvent != null && !isDefaultReplica) { 1197 hRegion.replayWALRegionEventMarker(regionEvent); 1198 continue; 1199 } 1200 BulkLoadDescriptor bulkLoadEvent = WALEdit.getBulkLoadDescriptor(metaCell); 1201 if (bulkLoadEvent != null) { 1202 hRegion.replayWALBulkLoadEventMarker(bulkLoadEvent); 1203 continue; 1204 } 1205 } 1206 it.remove(); 1207 } 1208 } 1209 requestCount.increment(); 1210 if (!region.getRegionInfo().isMetaRegion()) { 1211 regionServer.getMemStoreFlusher().reclaimMemStoreMemory(); 1212 } 1213 return region.batchReplay(mutations.toArray( 1214 new MutationReplay[mutations.size()]), replaySeqId); 1215 } finally { 1216 updateMutationMetrics(region, before, batchContainsPuts, batchContainsDelete); 1217 } 1218 } 1219 1220 private void closeAllScanners() { 1221 // Close any outstanding scanners. Means they'll get an UnknownScanner 1222 // exception next time they come in. 1223 for (Map.Entry<String, RegionScannerHolder> e : scanners.entrySet()) { 1224 try { 1225 e.getValue().s.close(); 1226 } catch (IOException ioe) { 1227 LOG.warn("Closing scanner " + e.getKey(), ioe); 1228 } 1229 } 1230 } 1231 1232 // Exposed for testing 1233 interface LogDelegate { 1234 void logBatchWarning(String firstRegionName, int sum, int rowSizeWarnThreshold); 1235 } 1236 1237 private static LogDelegate DEFAULT_LOG_DELEGATE = new LogDelegate() { 1238 @Override 1239 public void logBatchWarning(String firstRegionName, int sum, int rowSizeWarnThreshold) { 1240 if (LOG.isWarnEnabled()) { 1241 LOG.warn("Large batch operation detected (greater than " + rowSizeWarnThreshold 1242 + ") (HBASE-18023)." + " Requested Number of Rows: " + sum + " Client: " 1243 + RpcServer.getRequestUserName().orElse(null) + "/" 1244 + RpcServer.getRemoteAddress().orElse(null) 1245 + " first region in multi=" + firstRegionName); 1246 } 1247 } 1248 }; 1249 1250 private final LogDelegate ld; 1251 1252 public RSRpcServices(final HRegionServer rs) throws IOException { 1253 this(rs, DEFAULT_LOG_DELEGATE); 1254 } 1255 1256 // Directly invoked only for testing 1257 RSRpcServices(final HRegionServer rs, final LogDelegate ld) throws IOException { 1258 final Configuration conf = rs.getConfiguration(); 1259 this.ld = ld; 1260 regionServer = rs; 1261 rowSizeWarnThreshold = conf.getInt(BATCH_ROWS_THRESHOLD_NAME, BATCH_ROWS_THRESHOLD_DEFAULT); 1262 rejectRowsWithSizeOverThreshold = 1263 conf.getBoolean(REJECT_BATCH_ROWS_OVER_THRESHOLD, DEFAULT_REJECT_BATCH_ROWS_OVER_THRESHOLD); 1264 1265 final RpcSchedulerFactory rpcSchedulerFactory; 1266 try { 1267 rpcSchedulerFactory = getRpcSchedulerFactoryClass().asSubclass(RpcSchedulerFactory.class) 1268 .getDeclaredConstructor().newInstance(); 1269 } catch (NoSuchMethodException | InvocationTargetException | 1270 InstantiationException | IllegalAccessException e) { 1271 throw new IllegalArgumentException(e); 1272 } 1273 // Server to handle client requests. 1274 final InetSocketAddress initialIsa; 1275 final InetSocketAddress bindAddress; 1276 if(this instanceof MasterRpcServices) { 1277 String hostname = DNS.getHostname(conf, DNS.ServerType.MASTER); 1278 int port = conf.getInt(HConstants.MASTER_PORT, HConstants.DEFAULT_MASTER_PORT); 1279 // Creation of a HSA will force a resolve. 1280 initialIsa = new InetSocketAddress(hostname, port); 1281 bindAddress = new InetSocketAddress(conf.get("hbase.master.ipc.address", hostname), port); 1282 } else { 1283 String hostname = DNS.getHostname(conf, DNS.ServerType.REGIONSERVER); 1284 int port = conf.getInt(HConstants.REGIONSERVER_PORT, 1285 HConstants.DEFAULT_REGIONSERVER_PORT); 1286 // Creation of a HSA will force a resolve. 1287 initialIsa = new InetSocketAddress(hostname, port); 1288 bindAddress = 1289 new InetSocketAddress(conf.get("hbase.regionserver.ipc.address", hostname), port); 1290 } 1291 if (initialIsa.getAddress() == null) { 1292 throw new IllegalArgumentException("Failed resolve of " + initialIsa); 1293 } 1294 priority = createPriority(); 1295 // Using Address means we don't get the IP too. Shorten it more even to just the host name 1296 // w/o the domain. 1297 final String name = rs.getProcessName() + "/" + 1298 Address.fromParts(initialIsa.getHostName(), initialIsa.getPort()).toStringWithoutDomain(); 1299 // Set how many times to retry talking to another server over Connection. 1300 ConnectionUtils.setServerSideHConnectionRetriesConfig(conf, name, LOG); 1301 rpcServer = createRpcServer(rs, rpcSchedulerFactory, bindAddress, name); 1302 rpcServer.setRsRpcServices(this); 1303 if (!(rs instanceof HMaster)) { 1304 rpcServer.setSlowLogRecorder(rs.getSlowLogRecorder()); 1305 } 1306 scannerLeaseTimeoutPeriod = conf.getInt( 1307 HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, 1308 HConstants.DEFAULT_HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD); 1309 maxScannerResultSize = conf.getLong( 1310 HConstants.HBASE_SERVER_SCANNER_MAX_RESULT_SIZE_KEY, 1311 HConstants.DEFAULT_HBASE_SERVER_SCANNER_MAX_RESULT_SIZE); 1312 rpcTimeout = conf.getInt( 1313 HConstants.HBASE_RPC_TIMEOUT_KEY, 1314 HConstants.DEFAULT_HBASE_RPC_TIMEOUT); 1315 minimumScanTimeLimitDelta = conf.getLong( 1316 REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA, 1317 DEFAULT_REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA); 1318 1319 final InetSocketAddress address = rpcServer.getListenerAddress(); 1320 if (address == null) { 1321 throw new IOException("Listener channel is closed"); 1322 } 1323 // Set our address, however we need the final port that was given to rpcServer 1324 isa = new InetSocketAddress(initialIsa.getHostName(), address.getPort()); 1325 rpcServer.setErrorHandler(this); 1326 rs.setName(name); 1327 1328 closedScanners = CacheBuilder.newBuilder() 1329 .expireAfterAccess(scannerLeaseTimeoutPeriod, TimeUnit.MILLISECONDS).build(); 1330 } 1331 1332 protected RpcServerInterface createRpcServer( 1333 final Server server, 1334 final RpcSchedulerFactory rpcSchedulerFactory, 1335 final InetSocketAddress bindAddress, 1336 final String name 1337 ) throws IOException { 1338 final Configuration conf = server.getConfiguration(); 1339 boolean reservoirEnabled = conf.getBoolean(ByteBuffAllocator.ALLOCATOR_POOL_ENABLED_KEY, true); 1340 try { 1341 return RpcServerFactory.createRpcServer(server, name, getServices(), 1342 bindAddress, // use final bindAddress for this server. 1343 conf, rpcSchedulerFactory.create(conf, this, server), reservoirEnabled); 1344 } catch (BindException be) { 1345 throw new IOException(be.getMessage() + ". To switch ports use the '" 1346 + HConstants.REGIONSERVER_PORT + "' configuration property.", 1347 be.getCause() != null ? be.getCause() : be); 1348 } 1349 } 1350 1351 protected Class<?> getRpcSchedulerFactoryClass() { 1352 final Configuration conf = regionServer.getConfiguration(); 1353 return conf.getClass(REGION_SERVER_RPC_SCHEDULER_FACTORY_CLASS, 1354 SimpleRpcSchedulerFactory.class); 1355 } 1356 1357 @Override 1358 public void onConfigurationChange(Configuration newConf) { 1359 if (rpcServer instanceof ConfigurationObserver) { 1360 ((ConfigurationObserver)rpcServer).onConfigurationChange(newConf); 1361 } 1362 } 1363 1364 protected PriorityFunction createPriority() { 1365 return new AnnotationReadingPriorityFunction(this); 1366 } 1367 1368 protected void requirePermission(String request, Permission.Action perm) throws IOException { 1369 if (accessChecker != null) { 1370 accessChecker.requirePermission(RpcServer.getRequestUser().orElse(null), request, null, perm); 1371 } 1372 } 1373 1374 @VisibleForTesting 1375 public int getScannersCount() { 1376 return scanners.size(); 1377 } 1378 1379 public 1380 RegionScanner getScanner(long scannerId) { 1381 String scannerIdString = Long.toString(scannerId); 1382 RegionScannerHolder scannerHolder = scanners.get(scannerIdString); 1383 if (scannerHolder != null) { 1384 return scannerHolder.s; 1385 } 1386 return null; 1387 } 1388 1389 public String getScanDetailsWithId(long scannerId) { 1390 RegionScanner scanner = getScanner(scannerId); 1391 if (scanner == null) { 1392 return null; 1393 } 1394 StringBuilder builder = new StringBuilder(); 1395 builder.append("table: ").append(scanner.getRegionInfo().getTable().getNameAsString()); 1396 builder.append(" region: ").append(scanner.getRegionInfo().getRegionNameAsString()); 1397 return builder.toString(); 1398 } 1399 1400 public String getScanDetailsWithRequest(ScanRequest request) { 1401 try { 1402 if (!request.hasRegion()) { 1403 return null; 1404 } 1405 Region region = getRegion(request.getRegion()); 1406 StringBuilder builder = new StringBuilder(); 1407 builder.append("table: ").append(region.getRegionInfo().getTable().getNameAsString()); 1408 builder.append(" region: ").append(region.getRegionInfo().getRegionNameAsString()); 1409 return builder.toString(); 1410 } catch (IOException ignored) { 1411 return null; 1412 } 1413 } 1414 1415 /** 1416 * Get the vtime associated with the scanner. 1417 * Currently the vtime is the number of "next" calls. 1418 */ 1419 long getScannerVirtualTime(long scannerId) { 1420 String scannerIdString = Long.toString(scannerId); 1421 RegionScannerHolder scannerHolder = scanners.get(scannerIdString); 1422 if (scannerHolder != null) { 1423 return scannerHolder.getNextCallSeq(); 1424 } 1425 return 0L; 1426 } 1427 1428 /** 1429 * Method to account for the size of retained cells and retained data blocks. 1430 * @param context rpc call context 1431 * @param r result to add size. 1432 * @param lastBlock last block to check whether we need to add the block size in context. 1433 * @return an object that represents the last referenced block from this response. 1434 */ 1435 Object addSize(RpcCallContext context, Result r, Object lastBlock) { 1436 if (context != null && r != null && !r.isEmpty()) { 1437 for (Cell c : r.rawCells()) { 1438 context.incrementResponseCellSize(PrivateCellUtil.estimatedSerializedSizeOf(c)); 1439 1440 // Since byte buffers can point all kinds of crazy places it's harder to keep track 1441 // of which blocks are kept alive by what byte buffer. 1442 // So we make a guess. 1443 if (c instanceof ByteBufferExtendedCell) { 1444 ByteBufferExtendedCell bbCell = (ByteBufferExtendedCell) c; 1445 ByteBuffer bb = bbCell.getValueByteBuffer(); 1446 if (bb != lastBlock) { 1447 context.incrementResponseBlockSize(bb.capacity()); 1448 lastBlock = bb; 1449 } 1450 } else { 1451 // We're using the last block being the same as the current block as 1452 // a proxy for pointing to a new block. This won't be exact. 1453 // If there are multiple gets that bounce back and forth 1454 // Then it's possible that this will over count the size of 1455 // referenced blocks. However it's better to over count and 1456 // use two rpcs than to OOME the regionserver. 1457 byte[] valueArray = c.getValueArray(); 1458 if (valueArray != lastBlock) { 1459 context.incrementResponseBlockSize(valueArray.length); 1460 lastBlock = valueArray; 1461 } 1462 } 1463 1464 } 1465 } 1466 return lastBlock; 1467 } 1468 1469 private RegionScannerHolder addScanner(String scannerName, RegionScanner s, Shipper shipper, 1470 HRegion r, boolean needCursor) throws LeaseStillHeldException { 1471 Lease lease = regionServer.getLeaseManager().createLease( 1472 scannerName, this.scannerLeaseTimeoutPeriod, new ScannerListener(scannerName)); 1473 RpcCallback shippedCallback = new RegionScannerShippedCallBack(scannerName, shipper, lease); 1474 RpcCallback closeCallback; 1475 if (s instanceof RpcCallback) { 1476 closeCallback = (RpcCallback) s; 1477 } else { 1478 closeCallback = new RegionScannerCloseCallBack(s); 1479 } 1480 RegionScannerHolder rsh = 1481 new RegionScannerHolder(scannerName, s, r, closeCallback, shippedCallback, needCursor); 1482 RegionScannerHolder existing = scanners.putIfAbsent(scannerName, rsh); 1483 assert existing == null : "scannerId must be unique within regionserver's whole lifecycle! " + 1484 scannerName; 1485 return rsh; 1486 } 1487 1488 /** 1489 * Find the HRegion based on a region specifier 1490 * 1491 * @param regionSpecifier the region specifier 1492 * @return the corresponding region 1493 * @throws IOException if the specifier is not null, 1494 * but failed to find the region 1495 */ 1496 @VisibleForTesting 1497 public HRegion getRegion( 1498 final RegionSpecifier regionSpecifier) throws IOException { 1499 return regionServer.getRegion(regionSpecifier.getValue().toByteArray()); 1500 } 1501 1502 /** 1503 * Find the List of HRegions based on a list of region specifiers 1504 * 1505 * @param regionSpecifiers the list of region specifiers 1506 * @return the corresponding list of regions 1507 * @throws IOException if any of the specifiers is not null, 1508 * but failed to find the region 1509 */ 1510 private List<HRegion> getRegions(final List<RegionSpecifier> regionSpecifiers, 1511 final CacheEvictionStatsBuilder stats) { 1512 List<HRegion> regions = Lists.newArrayListWithCapacity(regionSpecifiers.size()); 1513 for (RegionSpecifier regionSpecifier: regionSpecifiers) { 1514 try { 1515 regions.add(regionServer.getRegion(regionSpecifier.getValue().toByteArray())); 1516 } catch (NotServingRegionException e) { 1517 stats.addException(regionSpecifier.getValue().toByteArray(), e); 1518 } 1519 } 1520 return regions; 1521 } 1522 1523 @VisibleForTesting 1524 public PriorityFunction getPriority() { 1525 return priority; 1526 } 1527 1528 @VisibleForTesting 1529 public Configuration getConfiguration() { 1530 return regionServer.getConfiguration(); 1531 } 1532 1533 private RegionServerRpcQuotaManager getRpcQuotaManager() { 1534 return regionServer.getRegionServerRpcQuotaManager(); 1535 } 1536 1537 private RegionServerSpaceQuotaManager getSpaceQuotaManager() { 1538 return regionServer.getRegionServerSpaceQuotaManager(); 1539 } 1540 1541 void start(ZKWatcher zkWatcher) { 1542 if (AccessChecker.isAuthorizationSupported(getConfiguration())) { 1543 accessChecker = new AccessChecker(getConfiguration()); 1544 } else { 1545 accessChecker = new NoopAccessChecker(getConfiguration()); 1546 } 1547 if (!getConfiguration().getBoolean("hbase.testing.nocluster", false) && zkWatcher != null) { 1548 zkPermissionWatcher = 1549 new ZKPermissionWatcher(zkWatcher, accessChecker.getAuthManager(), getConfiguration()); 1550 try { 1551 zkPermissionWatcher.start(); 1552 } catch (KeeperException e) { 1553 LOG.error("ZooKeeper permission watcher initialization failed", e); 1554 } 1555 } 1556 this.scannerIdGenerator = new ScannerIdGenerator(this.regionServer.serverName); 1557 rpcServer.start(); 1558 } 1559 1560 void stop() { 1561 if (zkPermissionWatcher != null) { 1562 zkPermissionWatcher.close(); 1563 } 1564 closeAllScanners(); 1565 rpcServer.stop(); 1566 } 1567 1568 /** 1569 * Called to verify that this server is up and running. 1570 */ 1571 // TODO : Rename this and HMaster#checkInitialized to isRunning() (or a better name). 1572 protected void checkOpen() throws IOException { 1573 if (regionServer.isAborted()) { 1574 throw new RegionServerAbortedException("Server " + regionServer.serverName + " aborting"); 1575 } 1576 if (regionServer.isStopped()) { 1577 throw new RegionServerStoppedException("Server " + regionServer.serverName + " stopping"); 1578 } 1579 if (!regionServer.isDataFileSystemOk()) { 1580 throw new RegionServerStoppedException("File system not available"); 1581 } 1582 if (!regionServer.isOnline()) { 1583 throw new ServerNotRunningYetException("Server " + regionServer.serverName 1584 + " is not running yet"); 1585 } 1586 } 1587 1588 /** 1589 * By default, put up an Admin and a Client Service. 1590 * Set booleans <code>hbase.regionserver.admin.executorService</code> and 1591 * <code>hbase.regionserver.client.executorService</code> if you want to enable/disable services. 1592 * Default is that both are enabled. 1593 * @return immutable list of blocking services and the security info classes that this server 1594 * supports 1595 */ 1596 protected List<BlockingServiceAndInterface> getServices() { 1597 boolean admin = 1598 getConfiguration().getBoolean(REGIONSERVER_ADMIN_SERVICE_CONFIG, true); 1599 boolean client = 1600 getConfiguration().getBoolean(REGIONSERVER_CLIENT_SERVICE_CONFIG, true); 1601 List<BlockingServiceAndInterface> bssi = new ArrayList<>(); 1602 if (client) { 1603 bssi.add(new BlockingServiceAndInterface( 1604 ClientService.newReflectiveBlockingService(this), 1605 ClientService.BlockingInterface.class)); 1606 } 1607 if (admin) { 1608 bssi.add(new BlockingServiceAndInterface( 1609 AdminService.newReflectiveBlockingService(this), 1610 AdminService.BlockingInterface.class)); 1611 } 1612 return new org.apache.hbase.thirdparty.com.google.common.collect. 1613 ImmutableList.Builder<BlockingServiceAndInterface>().addAll(bssi).build(); 1614 } 1615 1616 public InetSocketAddress getSocketAddress() { 1617 return isa; 1618 } 1619 1620 @Override 1621 public int getPriority(RequestHeader header, Message param, User user) { 1622 return priority.getPriority(header, param, user); 1623 } 1624 1625 @Override 1626 public long getDeadline(RequestHeader header, Message param) { 1627 return priority.getDeadline(header, param); 1628 } 1629 1630 /* 1631 * Check if an OOME and, if so, abort immediately to avoid creating more objects. 1632 * 1633 * @param e 1634 * 1635 * @return True if we OOME'd and are aborting. 1636 */ 1637 @Override 1638 public boolean checkOOME(final Throwable e) { 1639 return exitIfOOME(e); 1640 } 1641 1642 public static boolean exitIfOOME(final Throwable e ){ 1643 boolean stop = false; 1644 try { 1645 if (e instanceof OutOfMemoryError 1646 || (e.getCause() != null && e.getCause() instanceof OutOfMemoryError) 1647 || (e.getMessage() != null && e.getMessage().contains( 1648 "java.lang.OutOfMemoryError"))) { 1649 stop = true; 1650 LOG.error(HBaseMarkers.FATAL, "Run out of memory; " 1651 + RSRpcServices.class.getSimpleName() + " will abort itself immediately", 1652 e); 1653 } 1654 } finally { 1655 if (stop) { 1656 Runtime.getRuntime().halt(1); 1657 } 1658 } 1659 return stop; 1660 } 1661 1662 /** 1663 * Close a region on the region server. 1664 * 1665 * @param controller the RPC controller 1666 * @param request the request 1667 * @throws ServiceException 1668 */ 1669 @Override 1670 @QosPriority(priority=HConstants.ADMIN_QOS) 1671 public CloseRegionResponse closeRegion(final RpcController controller, 1672 final CloseRegionRequest request) throws ServiceException { 1673 final ServerName sn = (request.hasDestinationServer() ? 1674 ProtobufUtil.toServerName(request.getDestinationServer()) : null); 1675 1676 try { 1677 checkOpen(); 1678 throwOnWrongStartCode(request); 1679 final String encodedRegionName = ProtobufUtil.getRegionEncodedName(request.getRegion()); 1680 1681 requestCount.increment(); 1682 if (sn == null) { 1683 LOG.info("Close " + encodedRegionName + " without moving"); 1684 } else { 1685 LOG.info("Close " + encodedRegionName + ", moving to " + sn); 1686 } 1687 boolean closed = regionServer.closeRegion(encodedRegionName, false, sn); 1688 CloseRegionResponse.Builder builder = CloseRegionResponse.newBuilder().setClosed(closed); 1689 return builder.build(); 1690 } catch (IOException ie) { 1691 throw new ServiceException(ie); 1692 } 1693 } 1694 1695 /** 1696 * Compact a region on the region server. 1697 * 1698 * @param controller the RPC controller 1699 * @param request the request 1700 * @throws ServiceException 1701 */ 1702 @Override 1703 @QosPriority(priority = HConstants.ADMIN_QOS) 1704 public CompactRegionResponse compactRegion(final RpcController controller, 1705 final CompactRegionRequest request) throws ServiceException { 1706 try { 1707 checkOpen(); 1708 requestCount.increment(); 1709 HRegion region = getRegion(request.getRegion()); 1710 // Quota support is enabled, the requesting user is not system/super user 1711 // and a quota policy is enforced that disables compactions. 1712 if (QuotaUtil.isQuotaEnabled(getConfiguration()) && 1713 !Superusers.isSuperUser(RpcServer.getRequestUser().orElse(null)) && 1714 this.regionServer.getRegionServerSpaceQuotaManager() 1715 .areCompactionsDisabled(region.getTableDescriptor().getTableName())) { 1716 throw new DoNotRetryIOException( 1717 "Compactions on this region are " + "disabled due to a space quota violation."); 1718 } 1719 region.startRegionOperation(Operation.COMPACT_REGION); 1720 LOG.info("Compacting " + region.getRegionInfo().getRegionNameAsString()); 1721 boolean major = request.hasMajor() && request.getMajor(); 1722 if (request.hasFamily()) { 1723 byte[] family = request.getFamily().toByteArray(); 1724 String log = "User-triggered " + (major ? "major " : "") + "compaction for region " + 1725 region.getRegionInfo().getRegionNameAsString() + " and family " + 1726 Bytes.toString(family); 1727 LOG.trace(log); 1728 region.requestCompaction(family, log, Store.PRIORITY_USER, major, 1729 CompactionLifeCycleTracker.DUMMY); 1730 } else { 1731 String log = "User-triggered " + (major ? "major " : "") + "compaction for region " + 1732 region.getRegionInfo().getRegionNameAsString(); 1733 LOG.trace(log); 1734 region.requestCompaction(log, Store.PRIORITY_USER, major, CompactionLifeCycleTracker.DUMMY); 1735 } 1736 return CompactRegionResponse.newBuilder().build(); 1737 } catch (IOException ie) { 1738 throw new ServiceException(ie); 1739 } 1740 } 1741 1742 @Override 1743 public CompactionSwitchResponse compactionSwitch(RpcController controller, 1744 CompactionSwitchRequest request) throws ServiceException { 1745 try { 1746 checkOpen(); 1747 requestCount.increment(); 1748 boolean prevState = regionServer.compactSplitThread.isCompactionsEnabled(); 1749 CompactionSwitchResponse response = 1750 CompactionSwitchResponse.newBuilder().setPrevState(prevState).build(); 1751 if (prevState == request.getEnabled()) { 1752 // passed in requested state is same as current state. No action required 1753 return response; 1754 } 1755 regionServer.compactSplitThread.switchCompaction(request.getEnabled()); 1756 return response; 1757 } catch (IOException ie) { 1758 throw new ServiceException(ie); 1759 } 1760 } 1761 1762 /** 1763 * Flush a region on the region server. 1764 * 1765 * @param controller the RPC controller 1766 * @param request the request 1767 * @throws ServiceException 1768 */ 1769 @Override 1770 @QosPriority(priority=HConstants.ADMIN_QOS) 1771 public FlushRegionResponse flushRegion(final RpcController controller, 1772 final FlushRegionRequest request) throws ServiceException { 1773 try { 1774 checkOpen(); 1775 requestCount.increment(); 1776 HRegion region = getRegion(request.getRegion()); 1777 LOG.info("Flushing " + region.getRegionInfo().getRegionNameAsString()); 1778 boolean shouldFlush = true; 1779 if (request.hasIfOlderThanTs()) { 1780 shouldFlush = region.getEarliestFlushTimeForAllStores() < request.getIfOlderThanTs(); 1781 } 1782 FlushRegionResponse.Builder builder = FlushRegionResponse.newBuilder(); 1783 if (shouldFlush) { 1784 boolean writeFlushWalMarker = request.hasWriteFlushWalMarker() ? 1785 request.getWriteFlushWalMarker() : false; 1786 // Go behind the curtain so we can manage writing of the flush WAL marker 1787 HRegion.FlushResultImpl flushResult = 1788 region.flushcache(true, writeFlushWalMarker, FlushLifeCycleTracker.DUMMY); 1789 boolean compactionNeeded = flushResult.isCompactionNeeded(); 1790 if (compactionNeeded) { 1791 regionServer.compactSplitThread.requestSystemCompaction(region, 1792 "Compaction through user triggered flush"); 1793 } 1794 builder.setFlushed(flushResult.isFlushSucceeded()); 1795 builder.setWroteFlushWalMarker(flushResult.wroteFlushWalMarker); 1796 } 1797 builder.setLastFlushTime(region.getEarliestFlushTimeForAllStores()); 1798 return builder.build(); 1799 } catch (DroppedSnapshotException ex) { 1800 // Cache flush can fail in a few places. If it fails in a critical 1801 // section, we get a DroppedSnapshotException and a replay of wal 1802 // is required. Currently the only way to do this is a restart of 1803 // the server. 1804 regionServer.abort("Replay of WAL required. Forcing server shutdown", ex); 1805 throw new ServiceException(ex); 1806 } catch (IOException ie) { 1807 throw new ServiceException(ie); 1808 } 1809 } 1810 1811 @Override 1812 @QosPriority(priority=HConstants.ADMIN_QOS) 1813 public GetOnlineRegionResponse getOnlineRegion(final RpcController controller, 1814 final GetOnlineRegionRequest request) throws ServiceException { 1815 try { 1816 checkOpen(); 1817 requestCount.increment(); 1818 Map<String, HRegion> onlineRegions = regionServer.getOnlineRegions(); 1819 List<RegionInfo> list = new ArrayList<>(onlineRegions.size()); 1820 for (HRegion region: onlineRegions.values()) { 1821 list.add(region.getRegionInfo()); 1822 } 1823 list.sort(RegionInfo.COMPARATOR); 1824 return ResponseConverter.buildGetOnlineRegionResponse(list); 1825 } catch (IOException ie) { 1826 throw new ServiceException(ie); 1827 } 1828 } 1829 1830 // Master implementation of this Admin Service differs given it is not 1831 // able to supply detail only known to RegionServer. See note on 1832 // MasterRpcServers#getRegionInfo. 1833 @Override 1834 @QosPriority(priority=HConstants.ADMIN_QOS) 1835 public GetRegionInfoResponse getRegionInfo(final RpcController controller, 1836 final GetRegionInfoRequest request) throws ServiceException { 1837 try { 1838 checkOpen(); 1839 requestCount.increment(); 1840 HRegion region = getRegion(request.getRegion()); 1841 RegionInfo info = region.getRegionInfo(); 1842 byte[] bestSplitRow = null; 1843 boolean shouldSplit = true; 1844 if (request.hasBestSplitRow() && request.getBestSplitRow()) { 1845 HRegion r = region; 1846 region.startRegionOperation(Operation.SPLIT_REGION); 1847 r.forceSplit(null); 1848 // Even after setting force split if split policy says no to split then we should not split. 1849 shouldSplit = region.getSplitPolicy().shouldSplit() && !info.isMetaRegion(); 1850 bestSplitRow = r.checkSplit(); 1851 // when all table data are in memstore, bestSplitRow = null 1852 // try to flush region first 1853 if(bestSplitRow == null) { 1854 r.flush(true); 1855 bestSplitRow = r.checkSplit(); 1856 } 1857 r.clearSplit(); 1858 } 1859 GetRegionInfoResponse.Builder builder = GetRegionInfoResponse.newBuilder(); 1860 builder.setRegionInfo(ProtobufUtil.toRegionInfo(info)); 1861 if (request.hasCompactionState() && request.getCompactionState()) { 1862 builder.setCompactionState(ProtobufUtil.createCompactionState(region.getCompactionState())); 1863 } 1864 builder.setSplittable(region.isSplittable() && shouldSplit); 1865 builder.setMergeable(region.isMergeable()); 1866 if (request.hasBestSplitRow() && request.getBestSplitRow() && bestSplitRow != null) { 1867 builder.setBestSplitRow(UnsafeByteOperations.unsafeWrap(bestSplitRow)); 1868 } 1869 return builder.build(); 1870 } catch (IOException ie) { 1871 throw new ServiceException(ie); 1872 } 1873 } 1874 1875 @Override 1876 @QosPriority(priority=HConstants.ADMIN_QOS) 1877 public GetRegionLoadResponse getRegionLoad(RpcController controller, 1878 GetRegionLoadRequest request) throws ServiceException { 1879 1880 List<HRegion> regions; 1881 if (request.hasTableName()) { 1882 TableName tableName = ProtobufUtil.toTableName(request.getTableName()); 1883 regions = regionServer.getRegions(tableName); 1884 } else { 1885 regions = regionServer.getRegions(); 1886 } 1887 List<RegionLoad> rLoads = new ArrayList<>(regions.size()); 1888 RegionLoad.Builder regionLoadBuilder = ClusterStatusProtos.RegionLoad.newBuilder(); 1889 RegionSpecifier.Builder regionSpecifier = RegionSpecifier.newBuilder(); 1890 1891 try { 1892 for (HRegion region : regions) { 1893 rLoads.add(regionServer.createRegionLoad(region, regionLoadBuilder, regionSpecifier)); 1894 } 1895 } catch (IOException e) { 1896 throw new ServiceException(e); 1897 } 1898 GetRegionLoadResponse.Builder builder = GetRegionLoadResponse.newBuilder(); 1899 builder.addAllRegionLoads(rLoads); 1900 return builder.build(); 1901 } 1902 1903 @Override 1904 @QosPriority(priority=HConstants.ADMIN_QOS) 1905 public ClearCompactionQueuesResponse clearCompactionQueues(RpcController controller, 1906 ClearCompactionQueuesRequest request) throws ServiceException { 1907 LOG.debug("Client=" + RpcServer.getRequestUserName().orElse(null) + "/" 1908 + RpcServer.getRemoteAddress().orElse(null) + " clear compactions queue"); 1909 ClearCompactionQueuesResponse.Builder respBuilder = ClearCompactionQueuesResponse.newBuilder(); 1910 requestCount.increment(); 1911 if (clearCompactionQueues.compareAndSet(false,true)) { 1912 try { 1913 checkOpen(); 1914 regionServer.getRegionServerCoprocessorHost().preClearCompactionQueues(); 1915 for (String queueName : request.getQueueNameList()) { 1916 LOG.debug("clear " + queueName + " compaction queue"); 1917 switch (queueName) { 1918 case "long": 1919 regionServer.compactSplitThread.clearLongCompactionsQueue(); 1920 break; 1921 case "short": 1922 regionServer.compactSplitThread.clearShortCompactionsQueue(); 1923 break; 1924 default: 1925 LOG.warn("Unknown queue name " + queueName); 1926 throw new IOException("Unknown queue name " + queueName); 1927 } 1928 } 1929 regionServer.getRegionServerCoprocessorHost().postClearCompactionQueues(); 1930 } catch (IOException ie) { 1931 throw new ServiceException(ie); 1932 } finally { 1933 clearCompactionQueues.set(false); 1934 } 1935 } else { 1936 LOG.warn("Clear compactions queue is executing by other admin."); 1937 } 1938 return respBuilder.build(); 1939 } 1940 1941 /** 1942 * Get some information of the region server. 1943 * 1944 * @param controller the RPC controller 1945 * @param request the request 1946 * @throws ServiceException 1947 */ 1948 @Override 1949 @QosPriority(priority=HConstants.ADMIN_QOS) 1950 public GetServerInfoResponse getServerInfo(final RpcController controller, 1951 final GetServerInfoRequest request) throws ServiceException { 1952 try { 1953 checkOpen(); 1954 } catch (IOException ie) { 1955 throw new ServiceException(ie); 1956 } 1957 requestCount.increment(); 1958 int infoPort = regionServer.infoServer != null ? regionServer.infoServer.getPort() : -1; 1959 return ResponseConverter.buildGetServerInfoResponse(regionServer.serverName, infoPort); 1960 } 1961 1962 @Override 1963 @QosPriority(priority=HConstants.ADMIN_QOS) 1964 public GetStoreFileResponse getStoreFile(final RpcController controller, 1965 final GetStoreFileRequest request) throws ServiceException { 1966 try { 1967 checkOpen(); 1968 HRegion region = getRegion(request.getRegion()); 1969 requestCount.increment(); 1970 Set<byte[]> columnFamilies; 1971 if (request.getFamilyCount() == 0) { 1972 columnFamilies = region.getTableDescriptor().getColumnFamilyNames(); 1973 } else { 1974 columnFamilies = new TreeSet<>(Bytes.BYTES_RAWCOMPARATOR); 1975 for (ByteString cf: request.getFamilyList()) { 1976 columnFamilies.add(cf.toByteArray()); 1977 } 1978 } 1979 int nCF = columnFamilies.size(); 1980 List<String> fileList = region.getStoreFileList( 1981 columnFamilies.toArray(new byte[nCF][])); 1982 GetStoreFileResponse.Builder builder = GetStoreFileResponse.newBuilder(); 1983 builder.addAllStoreFile(fileList); 1984 return builder.build(); 1985 } catch (IOException ie) { 1986 throw new ServiceException(ie); 1987 } 1988 } 1989 1990 private void throwOnWrongStartCode(OpenRegionRequest request) throws ServiceException { 1991 if (!request.hasServerStartCode()) { 1992 LOG.warn("OpenRegionRequest for {} does not have a start code", request.getOpenInfoList()); 1993 return; 1994 } 1995 throwOnWrongStartCode(request.getServerStartCode()); 1996 } 1997 1998 private void throwOnWrongStartCode(CloseRegionRequest request) throws ServiceException { 1999 if (!request.hasServerStartCode()) { 2000 LOG.warn("CloseRegionRequest for {} does not have a start code", request.getRegion()); 2001 return; 2002 } 2003 throwOnWrongStartCode(request.getServerStartCode()); 2004 } 2005 2006 private void throwOnWrongStartCode(long serverStartCode) throws ServiceException { 2007 // check that we are the same server that this RPC is intended for. 2008 if (regionServer.serverName.getStartcode() != serverStartCode) { 2009 throw new ServiceException(new DoNotRetryIOException( 2010 "This RPC was intended for a " + "different server with startCode: " + serverStartCode + 2011 ", this server is: " + regionServer.serverName)); 2012 } 2013 } 2014 2015 private void throwOnWrongStartCode(ExecuteProceduresRequest req) throws ServiceException { 2016 if (req.getOpenRegionCount() > 0) { 2017 for (OpenRegionRequest openReq : req.getOpenRegionList()) { 2018 throwOnWrongStartCode(openReq); 2019 } 2020 } 2021 if (req.getCloseRegionCount() > 0) { 2022 for (CloseRegionRequest closeReq : req.getCloseRegionList()) { 2023 throwOnWrongStartCode(closeReq); 2024 } 2025 } 2026 } 2027 2028 /** 2029 * Open asynchronously a region or a set of regions on the region server. 2030 * 2031 * The opening is coordinated by ZooKeeper, and this method requires the znode to be created 2032 * before being called. As a consequence, this method should be called only from the master. 2033 * <p> 2034 * Different manages states for the region are: 2035 * </p><ul> 2036 * <li>region not opened: the region opening will start asynchronously.</li> 2037 * <li>a close is already in progress: this is considered as an error.</li> 2038 * <li>an open is already in progress: this new open request will be ignored. This is important 2039 * because the Master can do multiple requests if it crashes.</li> 2040 * <li>the region is already opened: this new open request will be ignored.</li> 2041 * </ul> 2042 * <p> 2043 * Bulk assign: If there are more than 1 region to open, it will be considered as a bulk assign. 2044 * For a single region opening, errors are sent through a ServiceException. For bulk assign, 2045 * errors are put in the response as FAILED_OPENING. 2046 * </p> 2047 * @param controller the RPC controller 2048 * @param request the request 2049 * @throws ServiceException 2050 */ 2051 @Override 2052 @QosPriority(priority=HConstants.ADMIN_QOS) 2053 public OpenRegionResponse openRegion(final RpcController controller, 2054 final OpenRegionRequest request) throws ServiceException { 2055 requestCount.increment(); 2056 throwOnWrongStartCode(request); 2057 2058 OpenRegionResponse.Builder builder = OpenRegionResponse.newBuilder(); 2059 final int regionCount = request.getOpenInfoCount(); 2060 final Map<TableName, TableDescriptor> htds = new HashMap<>(regionCount); 2061 final boolean isBulkAssign = regionCount > 1; 2062 try { 2063 checkOpen(); 2064 } catch (IOException ie) { 2065 TableName tableName = null; 2066 if (regionCount == 1) { 2067 org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionInfo ri = request.getOpenInfo(0).getRegion(); 2068 if (ri != null) { 2069 tableName = ProtobufUtil.toTableName(ri.getTableName()); 2070 } 2071 } 2072 if (!TableName.META_TABLE_NAME.equals(tableName)) { 2073 throw new ServiceException(ie); 2074 } 2075 // We are assigning meta, wait a little for regionserver to finish initialization. 2076 int timeout = regionServer.getConfiguration().getInt(HConstants.HBASE_RPC_TIMEOUT_KEY, 2077 HConstants.DEFAULT_HBASE_RPC_TIMEOUT) >> 2; // Quarter of RPC timeout 2078 long endTime = System.currentTimeMillis() + timeout; 2079 synchronized (regionServer.online) { 2080 try { 2081 while (System.currentTimeMillis() <= endTime 2082 && !regionServer.isStopped() && !regionServer.isOnline()) { 2083 regionServer.online.wait(regionServer.msgInterval); 2084 } 2085 checkOpen(); 2086 } catch (InterruptedException t) { 2087 Thread.currentThread().interrupt(); 2088 throw new ServiceException(t); 2089 } catch (IOException e) { 2090 throw new ServiceException(e); 2091 } 2092 } 2093 } 2094 2095 long masterSystemTime = request.hasMasterSystemTime() ? request.getMasterSystemTime() : -1; 2096 2097 for (RegionOpenInfo regionOpenInfo : request.getOpenInfoList()) { 2098 final RegionInfo region = ProtobufUtil.toRegionInfo(regionOpenInfo.getRegion()); 2099 TableDescriptor htd; 2100 try { 2101 String encodedName = region.getEncodedName(); 2102 byte[] encodedNameBytes = region.getEncodedNameAsBytes(); 2103 final HRegion onlineRegion = regionServer.getRegion(encodedName); 2104 if (onlineRegion != null) { 2105 // The region is already online. This should not happen any more. 2106 String error = "Received OPEN for the region:" 2107 + region.getRegionNameAsString() + ", which is already online"; 2108 LOG.warn(error); 2109 //regionServer.abort(error); 2110 //throw new IOException(error); 2111 builder.addOpeningState(RegionOpeningState.OPENED); 2112 continue; 2113 } 2114 LOG.info("Open " + region.getRegionNameAsString()); 2115 2116 final Boolean previous = regionServer.getRegionsInTransitionInRS().putIfAbsent( 2117 encodedNameBytes, Boolean.TRUE); 2118 2119 if (Boolean.FALSE.equals(previous)) { 2120 if (regionServer.getRegion(encodedName) != null) { 2121 // There is a close in progress. This should not happen any more. 2122 String error = "Received OPEN for the region:" 2123 + region.getRegionNameAsString() + ", which we are already trying to CLOSE"; 2124 regionServer.abort(error); 2125 throw new IOException(error); 2126 } 2127 regionServer.getRegionsInTransitionInRS().put(encodedNameBytes, Boolean.TRUE); 2128 } 2129 2130 if (Boolean.TRUE.equals(previous)) { 2131 // An open is in progress. This is supported, but let's log this. 2132 LOG.info("Receiving OPEN for the region:" + 2133 region.getRegionNameAsString() + ", which we are already trying to OPEN" 2134 + " - ignoring this new request for this region."); 2135 } 2136 2137 // We are opening this region. If it moves back and forth for whatever reason, we don't 2138 // want to keep returning the stale moved record while we are opening/if we close again. 2139 regionServer.removeFromMovedRegions(region.getEncodedName()); 2140 2141 if (previous == null || !previous.booleanValue()) { 2142 htd = htds.get(region.getTable()); 2143 if (htd == null) { 2144 htd = regionServer.tableDescriptors.get(region.getTable()); 2145 htds.put(region.getTable(), htd); 2146 } 2147 if (htd == null) { 2148 throw new IOException("Missing table descriptor for " + region.getEncodedName()); 2149 } 2150 // If there is no action in progress, we can submit a specific handler. 2151 // Need to pass the expected version in the constructor. 2152 if (regionServer.executorService == null) { 2153 LOG.info("No executor executorService; skipping open request"); 2154 } else { 2155 if (region.isMetaRegion()) { 2156 regionServer.executorService.submit(new OpenMetaHandler( 2157 regionServer, regionServer, region, htd, masterSystemTime)); 2158 } else { 2159 if (regionOpenInfo.getFavoredNodesCount() > 0) { 2160 regionServer.updateRegionFavoredNodesMapping(region.getEncodedName(), 2161 regionOpenInfo.getFavoredNodesList()); 2162 } 2163 if (htd.getPriority() >= HConstants.ADMIN_QOS || region.getTable().isSystemTable()) { 2164 regionServer.executorService.submit(new OpenPriorityRegionHandler( 2165 regionServer, regionServer, region, htd, masterSystemTime)); 2166 } else { 2167 regionServer.executorService.submit(new OpenRegionHandler( 2168 regionServer, regionServer, region, htd, masterSystemTime)); 2169 } 2170 } 2171 } 2172 } 2173 2174 builder.addOpeningState(RegionOpeningState.OPENED); 2175 } catch (IOException ie) { 2176 LOG.warn("Failed opening region " + region.getRegionNameAsString(), ie); 2177 if (isBulkAssign) { 2178 builder.addOpeningState(RegionOpeningState.FAILED_OPENING); 2179 } else { 2180 throw new ServiceException(ie); 2181 } 2182 } 2183 } 2184 return builder.build(); 2185 } 2186 2187 /** 2188 * Wamrmup a region on this server. 2189 * 2190 * This method should only be called by Master. It synchrnously opens the region and 2191 * closes the region bringing the most important pages in cache. 2192 * <p> 2193 * 2194 * @param controller the RPC controller 2195 * @param request the request 2196 * @throws ServiceException 2197 */ 2198 @Override 2199 public WarmupRegionResponse warmupRegion(final RpcController controller, 2200 final WarmupRegionRequest request) throws ServiceException { 2201 2202 final RegionInfo region = ProtobufUtil.toRegionInfo(request.getRegionInfo()); 2203 TableDescriptor htd; 2204 WarmupRegionResponse response = WarmupRegionResponse.getDefaultInstance(); 2205 2206 try { 2207 checkOpen(); 2208 String encodedName = region.getEncodedName(); 2209 byte[] encodedNameBytes = region.getEncodedNameAsBytes(); 2210 final HRegion onlineRegion = regionServer.getRegion(encodedName); 2211 2212 if (onlineRegion != null) { 2213 LOG.info("Region already online. Skipping warming up " + region); 2214 return response; 2215 } 2216 2217 htd = regionServer.tableDescriptors.get(region.getTable()); 2218 2219 if (regionServer.getRegionsInTransitionInRS().containsKey(encodedNameBytes)) { 2220 LOG.info("Region is in transition. Skipping warmup " + region); 2221 return response; 2222 } 2223 2224 LOG.info("Warming up region " + region.getRegionNameAsString()); 2225 HRegion.warmupHRegion(region, htd, regionServer.getWAL(region), 2226 regionServer.getConfiguration(), regionServer, null); 2227 2228 } catch (IOException ie) { 2229 LOG.error("Failed warming up region " + region.getRegionNameAsString(), ie); 2230 throw new ServiceException(ie); 2231 } 2232 2233 return response; 2234 } 2235 2236 /** 2237 * Replay the given changes when distributedLogReplay WAL edits from a failed RS. The guarantee is 2238 * that the given mutations will be durable on the receiving RS if this method returns without any 2239 * exception. 2240 * @param controller the RPC controller 2241 * @param request the request 2242 * @throws ServiceException 2243 */ 2244 @Override 2245 @QosPriority(priority = HConstants.REPLAY_QOS) 2246 public ReplicateWALEntryResponse replay(final RpcController controller, 2247 final ReplicateWALEntryRequest request) throws ServiceException { 2248 long before = EnvironmentEdgeManager.currentTime(); 2249 CellScanner cells = ((HBaseRpcController) controller).cellScanner(); 2250 try { 2251 checkOpen(); 2252 List<WALEntry> entries = request.getEntryList(); 2253 if (entries == null || entries.isEmpty()) { 2254 // empty input 2255 return ReplicateWALEntryResponse.newBuilder().build(); 2256 } 2257 ByteString regionName = entries.get(0).getKey().getEncodedRegionName(); 2258 HRegion region = regionServer.getRegionByEncodedName(regionName.toStringUtf8()); 2259 RegionCoprocessorHost coprocessorHost = 2260 ServerRegionReplicaUtil.isDefaultReplica(region.getRegionInfo()) 2261 ? region.getCoprocessorHost() 2262 : null; // do not invoke coprocessors if this is a secondary region replica 2263 List<Pair<WALKey, WALEdit>> walEntries = new ArrayList<>(); 2264 2265 // Skip adding the edits to WAL if this is a secondary region replica 2266 boolean isPrimary = RegionReplicaUtil.isDefaultReplica(region.getRegionInfo()); 2267 Durability durability = isPrimary ? Durability.USE_DEFAULT : Durability.SKIP_WAL; 2268 2269 for (WALEntry entry : entries) { 2270 if (!regionName.equals(entry.getKey().getEncodedRegionName())) { 2271 throw new NotServingRegionException("Replay request contains entries from multiple " + 2272 "regions. First region:" + regionName.toStringUtf8() + " , other region:" 2273 + entry.getKey().getEncodedRegionName()); 2274 } 2275 if (regionServer.nonceManager != null && isPrimary) { 2276 long nonceGroup = entry.getKey().hasNonceGroup() 2277 ? entry.getKey().getNonceGroup() : HConstants.NO_NONCE; 2278 long nonce = entry.getKey().hasNonce() ? entry.getKey().getNonce() : HConstants.NO_NONCE; 2279 regionServer.nonceManager.reportOperationFromWal( 2280 nonceGroup, 2281 nonce, 2282 entry.getKey().getWriteTime()); 2283 } 2284 Pair<WALKey, WALEdit> walEntry = (coprocessorHost == null) ? null : new Pair<>(); 2285 List<MutationReplay> edits = WALSplitUtil.getMutationsFromWALEntry(entry, 2286 cells, walEntry, durability); 2287 if (coprocessorHost != null) { 2288 // Start coprocessor replay here. The coprocessor is for each WALEdit instead of a 2289 // KeyValue. 2290 if (coprocessorHost.preWALRestore(region.getRegionInfo(), walEntry.getFirst(), 2291 walEntry.getSecond())) { 2292 // if bypass this log entry, ignore it ... 2293 continue; 2294 } 2295 walEntries.add(walEntry); 2296 } 2297 if(edits!=null && !edits.isEmpty()) { 2298 // HBASE-17924 2299 // sort to improve lock efficiency 2300 Collections.sort(edits, (v1, v2) -> Row.COMPARATOR.compare(v1.mutation, v2.mutation)); 2301 long replaySeqId = (entry.getKey().hasOrigSequenceNumber()) ? 2302 entry.getKey().getOrigSequenceNumber() : entry.getKey().getLogSequenceNumber(); 2303 OperationStatus[] result = doReplayBatchOp(region, edits, replaySeqId); 2304 // check if it's a partial success 2305 for (int i = 0; result != null && i < result.length; i++) { 2306 if (result[i] != OperationStatus.SUCCESS) { 2307 throw new IOException(result[i].getExceptionMsg()); 2308 } 2309 } 2310 } 2311 } 2312 2313 //sync wal at the end because ASYNC_WAL is used above 2314 WAL wal = region.getWAL(); 2315 if (wal != null) { 2316 wal.sync(); 2317 } 2318 2319 if (coprocessorHost != null) { 2320 for (Pair<WALKey, WALEdit> entry : walEntries) { 2321 coprocessorHost.postWALRestore(region.getRegionInfo(), entry.getFirst(), 2322 entry.getSecond()); 2323 } 2324 } 2325 return ReplicateWALEntryResponse.newBuilder().build(); 2326 } catch (IOException ie) { 2327 throw new ServiceException(ie); 2328 } finally { 2329 final MetricsRegionServer metricsRegionServer = regionServer.getMetrics(); 2330 if (metricsRegionServer != null) { 2331 metricsRegionServer.updateReplay(EnvironmentEdgeManager.currentTime() - before); 2332 } 2333 } 2334 } 2335 2336 /** 2337 * Replicate WAL entries on the region server. 2338 * 2339 * @param controller the RPC controller 2340 * @param request the request 2341 */ 2342 @Override 2343 @QosPriority(priority=HConstants.REPLICATION_QOS) 2344 public ReplicateWALEntryResponse replicateWALEntry(final RpcController controller, 2345 final ReplicateWALEntryRequest request) throws ServiceException { 2346 try { 2347 checkOpen(); 2348 if (regionServer.getReplicationSinkService() != null) { 2349 requestCount.increment(); 2350 List<WALEntry> entries = request.getEntryList(); 2351 CellScanner cellScanner = ((HBaseRpcController)controller).cellScanner(); 2352 regionServer.getRegionServerCoprocessorHost().preReplicateLogEntries(); 2353 regionServer.getReplicationSinkService().replicateLogEntries(entries, cellScanner, 2354 request.getReplicationClusterId(), request.getSourceBaseNamespaceDirPath(), 2355 request.getSourceHFileArchiveDirPath()); 2356 regionServer.getRegionServerCoprocessorHost().postReplicateLogEntries(); 2357 return ReplicateWALEntryResponse.newBuilder().build(); 2358 } else { 2359 throw new ServiceException("Replication services are not initialized yet"); 2360 } 2361 } catch (IOException ie) { 2362 throw new ServiceException(ie); 2363 } 2364 } 2365 2366 /** 2367 * Roll the WAL writer of the region server. 2368 * @param controller the RPC controller 2369 * @param request the request 2370 * @throws ServiceException 2371 */ 2372 @Override 2373 public RollWALWriterResponse rollWALWriter(final RpcController controller, 2374 final RollWALWriterRequest request) throws ServiceException { 2375 try { 2376 checkOpen(); 2377 requestCount.increment(); 2378 regionServer.getRegionServerCoprocessorHost().preRollWALWriterRequest(); 2379 regionServer.getWalRoller().requestRollAll(); 2380 regionServer.getRegionServerCoprocessorHost().postRollWALWriterRequest(); 2381 RollWALWriterResponse.Builder builder = RollWALWriterResponse.newBuilder(); 2382 return builder.build(); 2383 } catch (IOException ie) { 2384 throw new ServiceException(ie); 2385 } 2386 } 2387 2388 2389 /** 2390 * Stop the region server. 2391 * 2392 * @param controller the RPC controller 2393 * @param request the request 2394 * @throws ServiceException 2395 */ 2396 @Override 2397 @QosPriority(priority=HConstants.ADMIN_QOS) 2398 public StopServerResponse stopServer(final RpcController controller, 2399 final StopServerRequest request) throws ServiceException { 2400 requestCount.increment(); 2401 String reason = request.getReason(); 2402 regionServer.stop(reason); 2403 return StopServerResponse.newBuilder().build(); 2404 } 2405 2406 @Override 2407 public UpdateFavoredNodesResponse updateFavoredNodes(RpcController controller, 2408 UpdateFavoredNodesRequest request) throws ServiceException { 2409 List<UpdateFavoredNodesRequest.RegionUpdateInfo> openInfoList = request.getUpdateInfoList(); 2410 UpdateFavoredNodesResponse.Builder respBuilder = UpdateFavoredNodesResponse.newBuilder(); 2411 for (UpdateFavoredNodesRequest.RegionUpdateInfo regionUpdateInfo : openInfoList) { 2412 RegionInfo hri = ProtobufUtil.toRegionInfo(regionUpdateInfo.getRegion()); 2413 if (regionUpdateInfo.getFavoredNodesCount() > 0) { 2414 regionServer.updateRegionFavoredNodesMapping(hri.getEncodedName(), 2415 regionUpdateInfo.getFavoredNodesList()); 2416 } 2417 } 2418 respBuilder.setResponse(openInfoList.size()); 2419 return respBuilder.build(); 2420 } 2421 2422 /** 2423 * Atomically bulk load several HFiles into an open region 2424 * @return true if successful, false is failed but recoverably (no action) 2425 * @throws ServiceException if failed unrecoverably 2426 */ 2427 @Override 2428 public BulkLoadHFileResponse bulkLoadHFile(final RpcController controller, 2429 final BulkLoadHFileRequest request) throws ServiceException { 2430 long start = EnvironmentEdgeManager.currentTime(); 2431 List<String> clusterIds = new ArrayList<>(request.getClusterIdsList()); 2432 if(clusterIds.contains(this.regionServer.clusterId)){ 2433 return BulkLoadHFileResponse.newBuilder().setLoaded(true).build(); 2434 } else { 2435 clusterIds.add(this.regionServer.clusterId); 2436 } 2437 try { 2438 checkOpen(); 2439 requestCount.increment(); 2440 HRegion region = getRegion(request.getRegion()); 2441 Map<byte[], List<Path>> map = null; 2442 final boolean spaceQuotaEnabled = QuotaUtil.isQuotaEnabled(getConfiguration()); 2443 long sizeToBeLoaded = -1; 2444 2445 // Check to see if this bulk load would exceed the space quota for this table 2446 if (spaceQuotaEnabled) { 2447 ActivePolicyEnforcement activeSpaceQuotas = getSpaceQuotaManager().getActiveEnforcements(); 2448 SpaceViolationPolicyEnforcement enforcement = activeSpaceQuotas.getPolicyEnforcement( 2449 region); 2450 if (enforcement != null) { 2451 // Bulk loads must still be atomic. We must enact all or none. 2452 List<String> filePaths = new ArrayList<>(request.getFamilyPathCount()); 2453 for (FamilyPath familyPath : request.getFamilyPathList()) { 2454 filePaths.add(familyPath.getPath()); 2455 } 2456 // Check if the batch of files exceeds the current quota 2457 sizeToBeLoaded = enforcement.computeBulkLoadSize(regionServer.getFileSystem(), filePaths); 2458 } 2459 } 2460 2461 List<Pair<byte[], String>> familyPaths = new ArrayList<>(request.getFamilyPathCount()); 2462 for (FamilyPath familyPath : request.getFamilyPathList()) { 2463 familyPaths.add(new Pair<>(familyPath.getFamily().toByteArray(), familyPath.getPath())); 2464 } 2465 if (!request.hasBulkToken()) { 2466 if (region.getCoprocessorHost() != null) { 2467 region.getCoprocessorHost().preBulkLoadHFile(familyPaths); 2468 } 2469 try { 2470 map = region.bulkLoadHFiles(familyPaths, request.getAssignSeqNum(), null, 2471 request.getCopyFile(), clusterIds, request.getReplicate()); 2472 } finally { 2473 if (region.getCoprocessorHost() != null) { 2474 region.getCoprocessorHost().postBulkLoadHFile(familyPaths, map); 2475 } 2476 } 2477 } else { 2478 // secure bulk load 2479 map = regionServer.getSecureBulkLoadManager().secureBulkLoadHFiles(region, request, clusterIds); 2480 } 2481 BulkLoadHFileResponse.Builder builder = BulkLoadHFileResponse.newBuilder(); 2482 builder.setLoaded(map != null); 2483 if (map != null) { 2484 // Treat any negative size as a flag to "ignore" updating the region size as that is 2485 // not possible to occur in real life (cannot bulk load a file with negative size) 2486 if (spaceQuotaEnabled && sizeToBeLoaded > 0) { 2487 if (LOG.isTraceEnabled()) { 2488 LOG.trace("Incrementing space use of " + region.getRegionInfo() + " by " 2489 + sizeToBeLoaded + " bytes"); 2490 } 2491 // Inform space quotas of the new files for this region 2492 getSpaceQuotaManager().getRegionSizeStore().incrementRegionSize( 2493 region.getRegionInfo(), sizeToBeLoaded); 2494 } 2495 } 2496 return builder.build(); 2497 } catch (IOException ie) { 2498 throw new ServiceException(ie); 2499 } finally { 2500 final MetricsRegionServer metricsRegionServer = regionServer.getMetrics(); 2501 if (metricsRegionServer != null) { 2502 metricsRegionServer.updateBulkLoad(EnvironmentEdgeManager.currentTime() - start); 2503 } 2504 } 2505 } 2506 2507 @Override 2508 public PrepareBulkLoadResponse prepareBulkLoad(RpcController controller, 2509 PrepareBulkLoadRequest request) throws ServiceException { 2510 try { 2511 checkOpen(); 2512 requestCount.increment(); 2513 2514 HRegion region = getRegion(request.getRegion()); 2515 2516 String bulkToken = regionServer.getSecureBulkLoadManager().prepareBulkLoad(region, request); 2517 PrepareBulkLoadResponse.Builder builder = PrepareBulkLoadResponse.newBuilder(); 2518 builder.setBulkToken(bulkToken); 2519 return builder.build(); 2520 } catch (IOException ie) { 2521 throw new ServiceException(ie); 2522 } 2523 } 2524 2525 @Override 2526 public CleanupBulkLoadResponse cleanupBulkLoad(RpcController controller, 2527 CleanupBulkLoadRequest request) throws ServiceException { 2528 try { 2529 checkOpen(); 2530 requestCount.increment(); 2531 2532 HRegion region = getRegion(request.getRegion()); 2533 2534 regionServer.getSecureBulkLoadManager().cleanupBulkLoad(region, request); 2535 return CleanupBulkLoadResponse.newBuilder().build(); 2536 } catch (IOException ie) { 2537 throw new ServiceException(ie); 2538 } 2539 } 2540 2541 @Override 2542 public CoprocessorServiceResponse execService(final RpcController controller, 2543 final CoprocessorServiceRequest request) throws ServiceException { 2544 try { 2545 checkOpen(); 2546 requestCount.increment(); 2547 HRegion region = getRegion(request.getRegion()); 2548 com.google.protobuf.Message result = execServiceOnRegion(region, request.getCall()); 2549 CoprocessorServiceResponse.Builder builder = CoprocessorServiceResponse.newBuilder(); 2550 builder.setRegion(RequestConverter.buildRegionSpecifier( 2551 RegionSpecifierType.REGION_NAME, region.getRegionInfo().getRegionName())); 2552 // TODO: COPIES!!!!!! 2553 builder.setValue(builder.getValueBuilder().setName(result.getClass().getName()). 2554 setValue(org.apache.hbase.thirdparty.com.google.protobuf.ByteString. 2555 copyFrom(result.toByteArray()))); 2556 return builder.build(); 2557 } catch (IOException ie) { 2558 throw new ServiceException(ie); 2559 } 2560 } 2561 2562 private com.google.protobuf.Message execServiceOnRegion(HRegion region, 2563 final ClientProtos.CoprocessorServiceCall serviceCall) throws IOException { 2564 // ignore the passed in controller (from the serialized call) 2565 ServerRpcController execController = new ServerRpcController(); 2566 return region.execService(execController, serviceCall); 2567 } 2568 2569 /** 2570 * Get data from a table. 2571 * 2572 * @param controller the RPC controller 2573 * @param request the get request 2574 * @throws ServiceException 2575 */ 2576 @Override 2577 public GetResponse get(final RpcController controller, 2578 final GetRequest request) throws ServiceException { 2579 long before = EnvironmentEdgeManager.currentTime(); 2580 OperationQuota quota = null; 2581 HRegion region = null; 2582 try { 2583 checkOpen(); 2584 requestCount.increment(); 2585 rpcGetRequestCount.increment(); 2586 region = getRegion(request.getRegion()); 2587 2588 GetResponse.Builder builder = GetResponse.newBuilder(); 2589 ClientProtos.Get get = request.getGet(); 2590 // An asynchbase client, https://github.com/OpenTSDB/asynchbase, starts by trying to do 2591 // a get closest before. Throwing the UnknownProtocolException signals it that it needs 2592 // to switch and do hbase2 protocol (HBase servers do not tell clients what versions 2593 // they are; its a problem for non-native clients like asynchbase. HBASE-20225. 2594 if (get.hasClosestRowBefore() && get.getClosestRowBefore()) { 2595 throw new UnknownProtocolException("Is this a pre-hbase-1.0.0 or asynchbase client? " + 2596 "Client is invoking getClosestRowBefore removed in hbase-2.0.0 replaced by " + 2597 "reverse Scan."); 2598 } 2599 Boolean existence = null; 2600 Result r = null; 2601 RpcCallContext context = RpcServer.getCurrentCall().orElse(null); 2602 quota = getRpcQuotaManager().checkQuota(region, OperationQuota.OperationType.GET); 2603 2604 Get clientGet = ProtobufUtil.toGet(get); 2605 if (get.getExistenceOnly() && region.getCoprocessorHost() != null) { 2606 existence = region.getCoprocessorHost().preExists(clientGet); 2607 } 2608 if (existence == null) { 2609 if (context != null) { 2610 r = get(clientGet, (region), null, context); 2611 } else { 2612 // for test purpose 2613 r = region.get(clientGet); 2614 } 2615 if (get.getExistenceOnly()) { 2616 boolean exists = r.getExists(); 2617 if (region.getCoprocessorHost() != null) { 2618 exists = region.getCoprocessorHost().postExists(clientGet, exists); 2619 } 2620 existence = exists; 2621 } 2622 } 2623 if (existence != null) { 2624 ClientProtos.Result pbr = 2625 ProtobufUtil.toResult(existence, region.getRegionInfo().getReplicaId() != 0); 2626 builder.setResult(pbr); 2627 } else if (r != null) { 2628 ClientProtos.Result pbr; 2629 if (isClientCellBlockSupport(context) && controller instanceof HBaseRpcController 2630 && VersionInfoUtil.hasMinimumVersion(context.getClientVersionInfo(), 1, 3)) { 2631 pbr = ProtobufUtil.toResultNoData(r); 2632 ((HBaseRpcController) controller).setCellScanner(CellUtil.createCellScanner(r 2633 .rawCells())); 2634 addSize(context, r, null); 2635 } else { 2636 pbr = ProtobufUtil.toResult(r); 2637 } 2638 builder.setResult(pbr); 2639 } 2640 //r.cells is null when an table.exists(get) call 2641 if (r != null && r.rawCells() != null) { 2642 quota.addGetResult(r); 2643 } 2644 return builder.build(); 2645 } catch (IOException ie) { 2646 throw new ServiceException(ie); 2647 } finally { 2648 final MetricsRegionServer metricsRegionServer = regionServer.getMetrics(); 2649 if (metricsRegionServer != null) { 2650 TableDescriptor td = region != null? region.getTableDescriptor(): null; 2651 if (td != null) { 2652 metricsRegionServer.updateGet( 2653 td.getTableName(), EnvironmentEdgeManager.currentTime() - before); 2654 } 2655 } 2656 if (quota != null) { 2657 quota.close(); 2658 } 2659 } 2660 } 2661 2662 private Result get(Get get, HRegion region, RegionScannersCloseCallBack closeCallBack, 2663 RpcCallContext context) throws IOException { 2664 region.prepareGet(get); 2665 boolean stale = region.getRegionInfo().getReplicaId() != 0; 2666 2667 // This method is almost the same as HRegion#get. 2668 List<Cell> results = new ArrayList<>(); 2669 long before = EnvironmentEdgeManager.currentTime(); 2670 // pre-get CP hook 2671 if (region.getCoprocessorHost() != null) { 2672 if (region.getCoprocessorHost().preGet(get, results)) { 2673 region.metricsUpdateForGet(results, before); 2674 return Result 2675 .create(results, get.isCheckExistenceOnly() ? !results.isEmpty() : null, stale); 2676 } 2677 } 2678 Scan scan = new Scan(get); 2679 if (scan.getLoadColumnFamiliesOnDemandValue() == null) { 2680 scan.setLoadColumnFamiliesOnDemand(region.isLoadingCfsOnDemandDefault()); 2681 } 2682 RegionScannerImpl scanner = null; 2683 try { 2684 scanner = region.getScanner(scan); 2685 scanner.next(results); 2686 } finally { 2687 if (scanner != null) { 2688 if (closeCallBack == null) { 2689 // If there is a context then the scanner can be added to the current 2690 // RpcCallContext. The rpc callback will take care of closing the 2691 // scanner, for eg in case 2692 // of get() 2693 context.setCallBack(scanner); 2694 } else { 2695 // The call is from multi() where the results from the get() are 2696 // aggregated and then send out to the 2697 // rpc. The rpccall back will close all such scanners created as part 2698 // of multi(). 2699 closeCallBack.addScanner(scanner); 2700 } 2701 } 2702 } 2703 2704 // post-get CP hook 2705 if (region.getCoprocessorHost() != null) { 2706 region.getCoprocessorHost().postGet(get, results); 2707 } 2708 region.metricsUpdateForGet(results, before); 2709 2710 return Result.create(results, get.isCheckExistenceOnly() ? !results.isEmpty() : null, stale); 2711 } 2712 2713 private void checkBatchSizeAndLogLargeSize(MultiRequest request) throws ServiceException { 2714 int sum = 0; 2715 String firstRegionName = null; 2716 for (RegionAction regionAction : request.getRegionActionList()) { 2717 if (sum == 0) { 2718 firstRegionName = Bytes.toStringBinary(regionAction.getRegion().getValue().toByteArray()); 2719 } 2720 sum += regionAction.getActionCount(); 2721 } 2722 if (sum > rowSizeWarnThreshold) { 2723 ld.logBatchWarning(firstRegionName, sum, rowSizeWarnThreshold); 2724 if (rejectRowsWithSizeOverThreshold) { 2725 throw new ServiceException( 2726 "Rejecting large batch operation for current batch with firstRegionName: " 2727 + firstRegionName + " , Requested Number of Rows: " + sum + " , Size Threshold: " 2728 + rowSizeWarnThreshold); 2729 } 2730 } 2731 } 2732 2733 /** 2734 * Execute multiple actions on a table: get, mutate, and/or execCoprocessor 2735 * 2736 * @param rpcc the RPC controller 2737 * @param request the multi request 2738 * @throws ServiceException 2739 */ 2740 @Override 2741 public MultiResponse multi(final RpcController rpcc, final MultiRequest request) 2742 throws ServiceException { 2743 try { 2744 checkOpen(); 2745 } catch (IOException ie) { 2746 throw new ServiceException(ie); 2747 } 2748 2749 checkBatchSizeAndLogLargeSize(request); 2750 2751 // rpc controller is how we bring in data via the back door; it is unprotobuf'ed data. 2752 // It is also the conduit via which we pass back data. 2753 HBaseRpcController controller = (HBaseRpcController)rpcc; 2754 CellScanner cellScanner = controller != null ? controller.cellScanner(): null; 2755 if (controller != null) { 2756 controller.setCellScanner(null); 2757 } 2758 2759 long nonceGroup = request.hasNonceGroup() ? request.getNonceGroup() : HConstants.NO_NONCE; 2760 2761 // this will contain all the cells that we need to return. It's created later, if needed. 2762 List<CellScannable> cellsToReturn = null; 2763 MultiResponse.Builder responseBuilder = MultiResponse.newBuilder(); 2764 RegionActionResult.Builder regionActionResultBuilder = RegionActionResult.newBuilder(); 2765 Boolean processed = null; 2766 RegionScannersCloseCallBack closeCallBack = null; 2767 RpcCallContext context = RpcServer.getCurrentCall().orElse(null); 2768 this.rpcMultiRequestCount.increment(); 2769 this.requestCount.increment(); 2770 Map<RegionSpecifier, ClientProtos.RegionLoadStats> regionStats = new HashMap<>(request 2771 .getRegionActionCount()); 2772 ActivePolicyEnforcement spaceQuotaEnforcement = getSpaceQuotaManager().getActiveEnforcements(); 2773 for (RegionAction regionAction : request.getRegionActionList()) { 2774 OperationQuota quota; 2775 HRegion region; 2776 regionActionResultBuilder.clear(); 2777 RegionSpecifier regionSpecifier = regionAction.getRegion(); 2778 try { 2779 region = getRegion(regionSpecifier); 2780 quota = getRpcQuotaManager().checkQuota(region, regionAction.getActionList()); 2781 } catch (IOException e) { 2782 rpcServer.getMetrics().exception(e); 2783 regionActionResultBuilder.setException(ResponseConverter.buildException(e)); 2784 responseBuilder.addRegionActionResult(regionActionResultBuilder.build()); 2785 // All Mutations in this RegionAction not executed as we can not see the Region online here 2786 // in this RS. Will be retried from Client. Skipping all the Cells in CellScanner 2787 // corresponding to these Mutations. 2788 skipCellsForMutations(regionAction.getActionList(), cellScanner); 2789 continue; // For this region it's a failure. 2790 } 2791 2792 if (regionAction.hasAtomic() && regionAction.getAtomic()) { 2793 // How does this call happen? It may need some work to play well w/ the surroundings. 2794 // Need to return an item per Action along w/ Action index. TODO. 2795 try { 2796 if (request.hasCondition()) { 2797 Condition condition = request.getCondition(); 2798 byte[] row = condition.getRow().toByteArray(); 2799 byte[] family = condition.hasFamily() ? condition.getFamily().toByteArray() : null; 2800 byte[] qualifier = condition.hasQualifier() ? 2801 condition.getQualifier().toByteArray() : null; 2802 CompareOperator op = condition.hasCompareType() ? 2803 CompareOperator.valueOf(condition.getCompareType().name()) : null; 2804 ByteArrayComparable comparator = condition.hasComparator() ? 2805 ProtobufUtil.toComparator(condition.getComparator()) : null; 2806 Filter filter = condition.hasFilter() ? 2807 ProtobufUtil.toFilter(condition.getFilter()) : null; 2808 TimeRange timeRange = condition.hasTimeRange() ? 2809 ProtobufUtil.toTimeRange(condition.getTimeRange()) : 2810 TimeRange.allTime(); 2811 processed = 2812 checkAndRowMutate(region, regionAction.getActionList(), cellScanner, row, family, 2813 qualifier, op, comparator, filter, timeRange, regionActionResultBuilder, 2814 spaceQuotaEnforcement); 2815 } else { 2816 doAtomicBatchOp(regionActionResultBuilder, region, quota, regionAction.getActionList(), 2817 cellScanner, spaceQuotaEnforcement); 2818 processed = Boolean.TRUE; 2819 } 2820 } catch (IOException e) { 2821 rpcServer.getMetrics().exception(e); 2822 // As it's atomic, we may expect it's a global failure. 2823 regionActionResultBuilder.setException(ResponseConverter.buildException(e)); 2824 } 2825 } else { 2826 // doNonAtomicRegionMutation manages the exception internally 2827 if (context != null && closeCallBack == null) { 2828 // An RpcCallBack that creates a list of scanners that needs to perform callBack 2829 // operation on completion of multiGets. 2830 // Set this only once 2831 closeCallBack = new RegionScannersCloseCallBack(); 2832 context.setCallBack(closeCallBack); 2833 } 2834 cellsToReturn = doNonAtomicRegionMutation(region, quota, regionAction, cellScanner, 2835 regionActionResultBuilder, cellsToReturn, nonceGroup, closeCallBack, context, 2836 spaceQuotaEnforcement); 2837 } 2838 responseBuilder.addRegionActionResult(regionActionResultBuilder.build()); 2839 quota.close(); 2840 ClientProtos.RegionLoadStats regionLoadStats = region.getLoadStatistics(); 2841 if(regionLoadStats != null) { 2842 regionStats.put(regionSpecifier, regionLoadStats); 2843 } 2844 } 2845 // Load the controller with the Cells to return. 2846 if (cellsToReturn != null && !cellsToReturn.isEmpty() && controller != null) { 2847 controller.setCellScanner(CellUtil.createCellScanner(cellsToReturn)); 2848 } 2849 2850 if (processed != null) { 2851 responseBuilder.setProcessed(processed); 2852 } 2853 2854 MultiRegionLoadStats.Builder builder = MultiRegionLoadStats.newBuilder(); 2855 for(Entry<RegionSpecifier, ClientProtos.RegionLoadStats> stat: regionStats.entrySet()){ 2856 builder.addRegion(stat.getKey()); 2857 builder.addStat(stat.getValue()); 2858 } 2859 responseBuilder.setRegionStatistics(builder); 2860 return responseBuilder.build(); 2861 } 2862 2863 private void skipCellsForMutations(List<Action> actions, CellScanner cellScanner) { 2864 if (cellScanner == null) { 2865 return; 2866 } 2867 for (Action action : actions) { 2868 skipCellsForMutation(action, cellScanner); 2869 } 2870 } 2871 2872 private void skipCellsForMutation(Action action, CellScanner cellScanner) { 2873 if (cellScanner == null) { 2874 return; 2875 } 2876 try { 2877 if (action.hasMutation()) { 2878 MutationProto m = action.getMutation(); 2879 if (m.hasAssociatedCellCount()) { 2880 for (int i = 0; i < m.getAssociatedCellCount(); i++) { 2881 cellScanner.advance(); 2882 } 2883 } 2884 } 2885 } catch (IOException e) { 2886 // No need to handle these Individual Muatation level issue. Any way this entire RegionAction 2887 // marked as failed as we could not see the Region here. At client side the top level 2888 // RegionAction exception will be considered first. 2889 LOG.error("Error while skipping Cells in CellScanner for invalid Region Mutations", e); 2890 } 2891 } 2892 2893 /** 2894 * Mutate data in a table. 2895 * 2896 * @param rpcc the RPC controller 2897 * @param request the mutate request 2898 */ 2899 @Override 2900 public MutateResponse mutate(final RpcController rpcc, 2901 final MutateRequest request) throws ServiceException { 2902 // rpc controller is how we bring in data via the back door; it is unprotobuf'ed data. 2903 // It is also the conduit via which we pass back data. 2904 HBaseRpcController controller = (HBaseRpcController)rpcc; 2905 CellScanner cellScanner = controller != null ? controller.cellScanner() : null; 2906 OperationQuota quota = null; 2907 RpcCallContext context = RpcServer.getCurrentCall().orElse(null); 2908 ActivePolicyEnforcement spaceQuotaEnforcement = null; 2909 MutationType type = null; 2910 HRegion region = null; 2911 long before = EnvironmentEdgeManager.currentTime(); 2912 // Clear scanner so we are not holding on to reference across call. 2913 if (controller != null) { 2914 controller.setCellScanner(null); 2915 } 2916 try { 2917 checkOpen(); 2918 requestCount.increment(); 2919 rpcMutateRequestCount.increment(); 2920 region = getRegion(request.getRegion()); 2921 MutateResponse.Builder builder = MutateResponse.newBuilder(); 2922 MutationProto mutation = request.getMutation(); 2923 if (!region.getRegionInfo().isMetaRegion()) { 2924 regionServer.getMemStoreFlusher().reclaimMemStoreMemory(); 2925 } 2926 long nonceGroup = request.hasNonceGroup() ? request.getNonceGroup() : HConstants.NO_NONCE; 2927 Result r = null; 2928 Boolean processed = null; 2929 type = mutation.getMutateType(); 2930 2931 quota = getRpcQuotaManager().checkQuota(region, OperationQuota.OperationType.MUTATE); 2932 spaceQuotaEnforcement = getSpaceQuotaManager().getActiveEnforcements(); 2933 2934 switch (type) { 2935 case APPEND: 2936 // TODO: this doesn't actually check anything. 2937 r = append(region, quota, mutation, cellScanner, nonceGroup, spaceQuotaEnforcement); 2938 break; 2939 case INCREMENT: 2940 // TODO: this doesn't actually check anything. 2941 r = increment(region, quota, mutation, cellScanner, nonceGroup, spaceQuotaEnforcement); 2942 break; 2943 case PUT: 2944 Put put = ProtobufUtil.toPut(mutation, cellScanner); 2945 checkCellSizeLimit(region, put); 2946 // Throws an exception when violated 2947 spaceQuotaEnforcement.getPolicyEnforcement(region).check(put); 2948 quota.addMutation(put); 2949 if (request.hasCondition()) { 2950 Condition condition = request.getCondition(); 2951 byte[] row = condition.getRow().toByteArray(); 2952 byte[] family = condition.hasFamily() ? condition.getFamily().toByteArray() : null; 2953 byte[] qualifier = condition.hasQualifier() ? 2954 condition.getQualifier().toByteArray() : null; 2955 CompareOperator op = condition.hasCompareType() ? 2956 CompareOperator.valueOf(condition.getCompareType().name()) : null; 2957 ByteArrayComparable comparator = condition.hasComparator() ? 2958 ProtobufUtil.toComparator(condition.getComparator()) : null; 2959 Filter filter = condition.hasFilter() ? 2960 ProtobufUtil.toFilter(condition.getFilter()) : null; 2961 TimeRange timeRange = condition.hasTimeRange() ? 2962 ProtobufUtil.toTimeRange(condition.getTimeRange()) : 2963 TimeRange.allTime(); 2964 if (region.getCoprocessorHost() != null) { 2965 if (filter != null) { 2966 processed = region.getCoprocessorHost().preCheckAndPut(row, filter, put); 2967 } else { 2968 processed = region.getCoprocessorHost() 2969 .preCheckAndPut(row, family, qualifier, op, comparator, put); 2970 } 2971 } 2972 if (processed == null) { 2973 boolean result; 2974 if (filter != null) { 2975 result = region.checkAndMutate(row, filter, timeRange, put); 2976 } else { 2977 result = region.checkAndMutate(row, family, qualifier, op, comparator, timeRange, 2978 put); 2979 } 2980 if (region.getCoprocessorHost() != null) { 2981 if (filter != null) { 2982 result = region.getCoprocessorHost().postCheckAndPut(row, filter, put, result); 2983 } else { 2984 result = region.getCoprocessorHost() 2985 .postCheckAndPut(row, family, qualifier, op, comparator, put, result); 2986 } 2987 } 2988 processed = result; 2989 } 2990 } else { 2991 region.put(put); 2992 processed = Boolean.TRUE; 2993 } 2994 break; 2995 case DELETE: 2996 Delete delete = ProtobufUtil.toDelete(mutation, cellScanner); 2997 checkCellSizeLimit(region, delete); 2998 spaceQuotaEnforcement.getPolicyEnforcement(region).check(delete); 2999 quota.addMutation(delete); 3000 if (request.hasCondition()) { 3001 Condition condition = request.getCondition(); 3002 byte[] row = condition.getRow().toByteArray(); 3003 byte[] family = condition.hasFamily() ? condition.getFamily().toByteArray() : null; 3004 byte[] qualifier = condition.hasQualifier() ? 3005 condition.getQualifier().toByteArray() : null; 3006 CompareOperator op = condition.hasCompareType() ? 3007 CompareOperator.valueOf(condition.getCompareType().name()) : null; 3008 ByteArrayComparable comparator = condition.hasComparator() ? 3009 ProtobufUtil.toComparator(condition.getComparator()) : null; 3010 Filter filter = condition.hasFilter() ? 3011 ProtobufUtil.toFilter(condition.getFilter()) : null; 3012 TimeRange timeRange = condition.hasTimeRange() ? 3013 ProtobufUtil.toTimeRange(condition.getTimeRange()) : 3014 TimeRange.allTime(); 3015 if (region.getCoprocessorHost() != null) { 3016 if (filter != null) { 3017 processed = region.getCoprocessorHost().preCheckAndDelete(row, filter, delete); 3018 } else { 3019 processed = region.getCoprocessorHost() 3020 .preCheckAndDelete(row, family, qualifier, op, comparator, delete); 3021 } 3022 } 3023 if (processed == null) { 3024 boolean result; 3025 if (filter != null) { 3026 result = region.checkAndMutate(row, filter, timeRange, delete); 3027 } else { 3028 result = region.checkAndMutate(row, family, qualifier, op, comparator, timeRange, 3029 delete); 3030 } 3031 if (region.getCoprocessorHost() != null) { 3032 if (filter != null) { 3033 result = region.getCoprocessorHost().postCheckAndDelete(row, filter, delete, 3034 result); 3035 } else { 3036 result = region.getCoprocessorHost() 3037 .postCheckAndDelete(row, family, qualifier, op, comparator, delete, result); 3038 } 3039 } 3040 processed = result; 3041 } 3042 } else { 3043 region.delete(delete); 3044 processed = Boolean.TRUE; 3045 } 3046 break; 3047 default: 3048 throw new DoNotRetryIOException("Unsupported mutate type: " + type.name()); 3049 } 3050 if (processed != null) { 3051 builder.setProcessed(processed.booleanValue()); 3052 } 3053 boolean clientCellBlockSupported = isClientCellBlockSupport(context); 3054 addResult(builder, r, controller, clientCellBlockSupported); 3055 if (clientCellBlockSupported) { 3056 addSize(context, r, null); 3057 } 3058 return builder.build(); 3059 } catch (IOException ie) { 3060 regionServer.checkFileSystem(); 3061 throw new ServiceException(ie); 3062 } finally { 3063 if (quota != null) { 3064 quota.close(); 3065 } 3066 // Update metrics 3067 final MetricsRegionServer metricsRegionServer = regionServer.getMetrics(); 3068 if (metricsRegionServer != null && type != null) { 3069 long after = EnvironmentEdgeManager.currentTime(); 3070 switch (type) { 3071 case DELETE: 3072 if (request.hasCondition()) { 3073 metricsRegionServer.updateCheckAndDelete(after - before); 3074 } else { 3075 metricsRegionServer.updateDelete( 3076 region == null ? null : region.getRegionInfo().getTable(), after - before); 3077 } 3078 break; 3079 case PUT: 3080 if (request.hasCondition()) { 3081 metricsRegionServer.updateCheckAndPut(after - before); 3082 } else { 3083 metricsRegionServer.updatePut( 3084 region == null ? null : region.getRegionInfo().getTable(),after - before); 3085 } 3086 break; 3087 default: 3088 break; 3089 } 3090 } 3091 } 3092 } 3093 3094 // This is used to keep compatible with the old client implementation. Consider remove it if we 3095 // decide to drop the support of the client that still sends close request to a region scanner 3096 // which has already been exhausted. 3097 @Deprecated 3098 private static final IOException SCANNER_ALREADY_CLOSED = new IOException() { 3099 3100 private static final long serialVersionUID = -4305297078988180130L; 3101 3102 @Override 3103 public synchronized Throwable fillInStackTrace() { 3104 return this; 3105 } 3106 }; 3107 3108 private RegionScannerHolder getRegionScanner(ScanRequest request) throws IOException { 3109 String scannerName = Long.toString(request.getScannerId()); 3110 RegionScannerHolder rsh = scanners.get(scannerName); 3111 if (rsh == null) { 3112 // just ignore the next or close request if scanner does not exists. 3113 if (closedScanners.getIfPresent(scannerName) != null) { 3114 throw SCANNER_ALREADY_CLOSED; 3115 } else { 3116 LOG.warn("Client tried to access missing scanner " + scannerName); 3117 throw new UnknownScannerException( 3118 "Unknown scanner '" + scannerName + "'. This can happen due to any of the following " + 3119 "reasons: a) Scanner id given is wrong, b) Scanner lease expired because of " + 3120 "long wait between consecutive client checkins, c) Server may be closing down, " + 3121 "d) RegionServer restart during upgrade.\nIf the issue is due to reason (b), a " + 3122 "possible fix would be increasing the value of" + 3123 "'hbase.client.scanner.timeout.period' configuration."); 3124 } 3125 } 3126 RegionInfo hri = rsh.s.getRegionInfo(); 3127 // Yes, should be the same instance 3128 if (regionServer.getOnlineRegion(hri.getRegionName()) != rsh.r) { 3129 String msg = "Region has changed on the scanner " + scannerName + ": regionName=" 3130 + hri.getRegionNameAsString() + ", scannerRegionName=" + rsh.r; 3131 LOG.warn(msg + ", closing..."); 3132 scanners.remove(scannerName); 3133 try { 3134 rsh.s.close(); 3135 } catch (IOException e) { 3136 LOG.warn("Getting exception closing " + scannerName, e); 3137 } finally { 3138 try { 3139 regionServer.getLeaseManager().cancelLease(scannerName); 3140 } catch (LeaseException e) { 3141 LOG.warn("Getting exception closing " + scannerName, e); 3142 } 3143 } 3144 throw new NotServingRegionException(msg); 3145 } 3146 return rsh; 3147 } 3148 3149 private RegionScannerHolder newRegionScanner(ScanRequest request, ScanResponse.Builder builder) 3150 throws IOException { 3151 HRegion region = getRegion(request.getRegion()); 3152 ClientProtos.Scan protoScan = request.getScan(); 3153 boolean isLoadingCfsOnDemandSet = protoScan.hasLoadColumnFamiliesOnDemand(); 3154 Scan scan = ProtobufUtil.toScan(protoScan); 3155 // if the request doesn't set this, get the default region setting. 3156 if (!isLoadingCfsOnDemandSet) { 3157 scan.setLoadColumnFamiliesOnDemand(region.isLoadingCfsOnDemandDefault()); 3158 } 3159 3160 if (!scan.hasFamilies()) { 3161 // Adding all families to scanner 3162 for (byte[] family : region.getTableDescriptor().getColumnFamilyNames()) { 3163 scan.addFamily(family); 3164 } 3165 } 3166 if (region.getCoprocessorHost() != null) { 3167 // preScannerOpen is not allowed to return a RegionScanner. Only post hook can create a 3168 // wrapper for the core created RegionScanner 3169 region.getCoprocessorHost().preScannerOpen(scan); 3170 } 3171 RegionScannerImpl coreScanner = region.getScanner(scan); 3172 Shipper shipper = coreScanner; 3173 RegionScanner scanner = coreScanner; 3174 if (region.getCoprocessorHost() != null) { 3175 scanner = region.getCoprocessorHost().postScannerOpen(scan, scanner); 3176 } 3177 long scannerId = scannerIdGenerator.generateNewScannerId(); 3178 builder.setScannerId(scannerId); 3179 builder.setMvccReadPoint(scanner.getMvccReadPoint()); 3180 builder.setTtl(scannerLeaseTimeoutPeriod); 3181 String scannerName = String.valueOf(scannerId); 3182 return addScanner(scannerName, scanner, shipper, region, scan.isNeedCursorResult()); 3183 } 3184 3185 private void checkScanNextCallSeq(ScanRequest request, RegionScannerHolder rsh) 3186 throws OutOfOrderScannerNextException { 3187 // if nextCallSeq does not match throw Exception straight away. This needs to be 3188 // performed even before checking of Lease. 3189 // See HBASE-5974 3190 if (request.hasNextCallSeq()) { 3191 long callSeq = request.getNextCallSeq(); 3192 if (!rsh.incNextCallSeq(callSeq)) { 3193 throw new OutOfOrderScannerNextException("Expected nextCallSeq: " + rsh.getNextCallSeq() 3194 + " But the nextCallSeq got from client: " + request.getNextCallSeq() + "; request=" 3195 + TextFormat.shortDebugString(request)); 3196 } 3197 } 3198 } 3199 3200 private void addScannerLeaseBack(LeaseManager.Lease lease) { 3201 try { 3202 regionServer.getLeaseManager().addLease(lease); 3203 } catch (LeaseStillHeldException e) { 3204 // should not happen as the scanner id is unique. 3205 throw new AssertionError(e); 3206 } 3207 } 3208 3209 private long getTimeLimit(HBaseRpcController controller, boolean allowHeartbeatMessages) { 3210 // Set the time limit to be half of the more restrictive timeout value (one of the 3211 // timeout values must be positive). In the event that both values are positive, the 3212 // more restrictive of the two is used to calculate the limit. 3213 if (allowHeartbeatMessages && (scannerLeaseTimeoutPeriod > 0 || rpcTimeout > 0)) { 3214 long timeLimitDelta; 3215 if (scannerLeaseTimeoutPeriod > 0 && rpcTimeout > 0) { 3216 timeLimitDelta = Math.min(scannerLeaseTimeoutPeriod, rpcTimeout); 3217 } else { 3218 timeLimitDelta = scannerLeaseTimeoutPeriod > 0 ? scannerLeaseTimeoutPeriod : rpcTimeout; 3219 } 3220 if (controller != null && controller.getCallTimeout() > 0) { 3221 timeLimitDelta = Math.min(timeLimitDelta, controller.getCallTimeout()); 3222 } 3223 // Use half of whichever timeout value was more restrictive... But don't allow 3224 // the time limit to be less than the allowable minimum (could cause an 3225 // immediatate timeout before scanning any data). 3226 timeLimitDelta = Math.max(timeLimitDelta / 2, minimumScanTimeLimitDelta); 3227 // XXX: Can not use EnvironmentEdge here because TestIncrementTimeRange use a 3228 // ManualEnvironmentEdge. Consider using System.nanoTime instead. 3229 return System.currentTimeMillis() + timeLimitDelta; 3230 } 3231 // Default value of timeLimit is negative to indicate no timeLimit should be 3232 // enforced. 3233 return -1L; 3234 } 3235 3236 private void checkLimitOfRows(int numOfCompleteRows, int limitOfRows, boolean moreRows, 3237 ScannerContext scannerContext, ScanResponse.Builder builder) { 3238 if (numOfCompleteRows >= limitOfRows) { 3239 if (LOG.isTraceEnabled()) { 3240 LOG.trace("Done scanning, limit of rows reached, moreRows: " + moreRows + 3241 " scannerContext: " + scannerContext); 3242 } 3243 builder.setMoreResults(false); 3244 } 3245 } 3246 3247 // return whether we have more results in region. 3248 private void scan(HBaseRpcController controller, ScanRequest request, RegionScannerHolder rsh, 3249 long maxQuotaResultSize, int maxResults, int limitOfRows, List<Result> results, 3250 ScanResponse.Builder builder, MutableObject<Object> lastBlock, RpcCallContext context) 3251 throws IOException { 3252 HRegion region = rsh.r; 3253 RegionScanner scanner = rsh.s; 3254 long maxResultSize; 3255 if (scanner.getMaxResultSize() > 0) { 3256 maxResultSize = Math.min(scanner.getMaxResultSize(), maxQuotaResultSize); 3257 } else { 3258 maxResultSize = maxQuotaResultSize; 3259 } 3260 // This is cells inside a row. Default size is 10 so if many versions or many cfs, 3261 // then we'll resize. Resizings show in profiler. Set it higher than 10. For now 3262 // arbitrary 32. TODO: keep record of general size of results being returned. 3263 List<Cell> values = new ArrayList<>(32); 3264 region.startRegionOperation(Operation.SCAN); 3265 try { 3266 int numOfResults = 0; 3267 int numOfCompleteRows = 0; 3268 long before = EnvironmentEdgeManager.currentTime(); 3269 synchronized (scanner) { 3270 boolean stale = (region.getRegionInfo().getReplicaId() != 0); 3271 boolean clientHandlesPartials = 3272 request.hasClientHandlesPartials() && request.getClientHandlesPartials(); 3273 boolean clientHandlesHeartbeats = 3274 request.hasClientHandlesHeartbeats() && request.getClientHandlesHeartbeats(); 3275 3276 // On the server side we must ensure that the correct ordering of partial results is 3277 // returned to the client to allow them to properly reconstruct the partial results. 3278 // If the coprocessor host is adding to the result list, we cannot guarantee the 3279 // correct ordering of partial results and so we prevent partial results from being 3280 // formed. 3281 boolean serverGuaranteesOrderOfPartials = results.isEmpty(); 3282 boolean allowPartialResults = clientHandlesPartials && serverGuaranteesOrderOfPartials; 3283 boolean moreRows = false; 3284 3285 // Heartbeat messages occur when the processing of the ScanRequest is exceeds a 3286 // certain time threshold on the server. When the time threshold is exceeded, the 3287 // server stops the scan and sends back whatever Results it has accumulated within 3288 // that time period (may be empty). Since heartbeat messages have the potential to 3289 // create partial Results (in the event that the timeout occurs in the middle of a 3290 // row), we must only generate heartbeat messages when the client can handle both 3291 // heartbeats AND partials 3292 boolean allowHeartbeatMessages = clientHandlesHeartbeats && allowPartialResults; 3293 3294 long timeLimit = getTimeLimit(controller, allowHeartbeatMessages); 3295 3296 final LimitScope sizeScope = 3297 allowPartialResults ? LimitScope.BETWEEN_CELLS : LimitScope.BETWEEN_ROWS; 3298 final LimitScope timeScope = 3299 allowHeartbeatMessages ? LimitScope.BETWEEN_CELLS : LimitScope.BETWEEN_ROWS; 3300 3301 boolean trackMetrics = request.hasTrackScanMetrics() && request.getTrackScanMetrics(); 3302 3303 // Configure with limits for this RPC. Set keep progress true since size progress 3304 // towards size limit should be kept between calls to nextRaw 3305 ScannerContext.Builder contextBuilder = ScannerContext.newBuilder(true); 3306 // maxResultSize - either we can reach this much size for all cells(being read) data or sum 3307 // of heap size occupied by cells(being read). Cell data means its key and value parts. 3308 contextBuilder.setSizeLimit(sizeScope, maxResultSize, maxResultSize); 3309 contextBuilder.setBatchLimit(scanner.getBatch()); 3310 contextBuilder.setTimeLimit(timeScope, timeLimit); 3311 contextBuilder.setTrackMetrics(trackMetrics); 3312 ScannerContext scannerContext = contextBuilder.build(); 3313 boolean limitReached = false; 3314 while (numOfResults < maxResults) { 3315 // Reset the batch progress to 0 before every call to RegionScanner#nextRaw. The 3316 // batch limit is a limit on the number of cells per Result. Thus, if progress is 3317 // being tracked (i.e. scannerContext.keepProgress() is true) then we need to 3318 // reset the batch progress between nextRaw invocations since we don't want the 3319 // batch progress from previous calls to affect future calls 3320 scannerContext.setBatchProgress(0); 3321 3322 // Collect values to be returned here 3323 moreRows = scanner.nextRaw(values, scannerContext); 3324 3325 if (!values.isEmpty()) { 3326 if (limitOfRows > 0) { 3327 // First we need to check if the last result is partial and we have a row change. If 3328 // so then we need to increase the numOfCompleteRows. 3329 if (results.isEmpty()) { 3330 if (rsh.rowOfLastPartialResult != null && 3331 !CellUtil.matchingRows(values.get(0), rsh.rowOfLastPartialResult)) { 3332 numOfCompleteRows++; 3333 checkLimitOfRows(numOfCompleteRows, limitOfRows, moreRows, scannerContext, 3334 builder); 3335 } 3336 } else { 3337 Result lastResult = results.get(results.size() - 1); 3338 if (lastResult.mayHaveMoreCellsInRow() && 3339 !CellUtil.matchingRows(values.get(0), lastResult.getRow())) { 3340 numOfCompleteRows++; 3341 checkLimitOfRows(numOfCompleteRows, limitOfRows, moreRows, scannerContext, 3342 builder); 3343 } 3344 } 3345 if (builder.hasMoreResults() && !builder.getMoreResults()) { 3346 break; 3347 } 3348 } 3349 boolean mayHaveMoreCellsInRow = scannerContext.mayHaveMoreCellsInRow(); 3350 Result r = Result.create(values, null, stale, mayHaveMoreCellsInRow); 3351 lastBlock.setValue(addSize(context, r, lastBlock.getValue())); 3352 results.add(r); 3353 numOfResults++; 3354 if (!mayHaveMoreCellsInRow && limitOfRows > 0) { 3355 numOfCompleteRows++; 3356 checkLimitOfRows(numOfCompleteRows, limitOfRows, moreRows, scannerContext, builder); 3357 if (builder.hasMoreResults() && !builder.getMoreResults()) { 3358 break; 3359 } 3360 } 3361 } else if (!moreRows && !results.isEmpty()) { 3362 // No more cells for the scan here, we need to ensure that the mayHaveMoreCellsInRow of 3363 // last result is false. Otherwise it's possible that: the first nextRaw returned 3364 // because BATCH_LIMIT_REACHED (BTW it happen to exhaust all cells of the scan),so the 3365 // last result's mayHaveMoreCellsInRow will be true. while the following nextRaw will 3366 // return with moreRows=false, which means moreResultsInRegion would be false, it will 3367 // be a contradictory state (HBASE-21206). 3368 int lastIdx = results.size() - 1; 3369 Result r = results.get(lastIdx); 3370 if (r.mayHaveMoreCellsInRow()) { 3371 results.set(lastIdx, Result.create(r.rawCells(), r.getExists(), r.isStale(), false)); 3372 } 3373 } 3374 boolean sizeLimitReached = scannerContext.checkSizeLimit(LimitScope.BETWEEN_ROWS); 3375 boolean timeLimitReached = scannerContext.checkTimeLimit(LimitScope.BETWEEN_ROWS); 3376 boolean resultsLimitReached = numOfResults >= maxResults; 3377 limitReached = sizeLimitReached || timeLimitReached || resultsLimitReached; 3378 3379 if (limitReached || !moreRows) { 3380 // We only want to mark a ScanResponse as a heartbeat message in the event that 3381 // there are more values to be read server side. If there aren't more values, 3382 // marking it as a heartbeat is wasteful because the client will need to issue 3383 // another ScanRequest only to realize that they already have all the values 3384 if (moreRows && timeLimitReached) { 3385 // Heartbeat messages occur when the time limit has been reached. 3386 builder.setHeartbeatMessage(true); 3387 if (rsh.needCursor) { 3388 Cell cursorCell = scannerContext.getLastPeekedCell(); 3389 if (cursorCell != null) { 3390 builder.setCursor(ProtobufUtil.toCursor(cursorCell)); 3391 } 3392 } 3393 } 3394 break; 3395 } 3396 values.clear(); 3397 } 3398 builder.setMoreResultsInRegion(moreRows); 3399 // Check to see if the client requested that we track metrics server side. If the 3400 // client requested metrics, retrieve the metrics from the scanner context. 3401 if (trackMetrics) { 3402 Map<String, Long> metrics = scannerContext.getMetrics().getMetricsMap(); 3403 ScanMetrics.Builder metricBuilder = ScanMetrics.newBuilder(); 3404 NameInt64Pair.Builder pairBuilder = NameInt64Pair.newBuilder(); 3405 3406 for (Entry<String, Long> entry : metrics.entrySet()) { 3407 pairBuilder.setName(entry.getKey()); 3408 pairBuilder.setValue(entry.getValue()); 3409 metricBuilder.addMetrics(pairBuilder.build()); 3410 } 3411 3412 builder.setScanMetrics(metricBuilder.build()); 3413 } 3414 } 3415 long end = EnvironmentEdgeManager.currentTime(); 3416 long responseCellSize = context != null ? context.getResponseCellSize() : 0; 3417 region.getMetrics().updateScanTime(end - before); 3418 final MetricsRegionServer metricsRegionServer = regionServer.getMetrics(); 3419 if (metricsRegionServer != null) { 3420 metricsRegionServer.updateScanSize( 3421 region.getTableDescriptor().getTableName(), responseCellSize); 3422 metricsRegionServer.updateScanTime( 3423 region.getTableDescriptor().getTableName(), end - before); 3424 } 3425 } finally { 3426 region.closeRegionOperation(); 3427 } 3428 // coprocessor postNext hook 3429 if (region.getCoprocessorHost() != null) { 3430 region.getCoprocessorHost().postScannerNext(scanner, results, maxResults, true); 3431 } 3432 } 3433 3434 /** 3435 * Scan data in a table. 3436 * 3437 * @param controller the RPC controller 3438 * @param request the scan request 3439 * @throws ServiceException 3440 */ 3441 @Override 3442 public ScanResponse scan(final RpcController controller, final ScanRequest request) 3443 throws ServiceException { 3444 if (controller != null && !(controller instanceof HBaseRpcController)) { 3445 throw new UnsupportedOperationException( 3446 "We only do " + "HBaseRpcControllers! FIX IF A PROBLEM: " + controller); 3447 } 3448 if (!request.hasScannerId() && !request.hasScan()) { 3449 throw new ServiceException( 3450 new DoNotRetryIOException("Missing required input: scannerId or scan")); 3451 } 3452 try { 3453 checkOpen(); 3454 } catch (IOException e) { 3455 if (request.hasScannerId()) { 3456 String scannerName = Long.toString(request.getScannerId()); 3457 if (LOG.isDebugEnabled()) { 3458 LOG.debug( 3459 "Server shutting down and client tried to access missing scanner " + scannerName); 3460 } 3461 final LeaseManager leaseManager = regionServer.getLeaseManager(); 3462 if (leaseManager != null) { 3463 try { 3464 leaseManager.cancelLease(scannerName); 3465 } catch (LeaseException le) { 3466 // No problem, ignore 3467 if (LOG.isTraceEnabled()) { 3468 LOG.trace("Un-able to cancel lease of scanner. It could already be closed."); 3469 } 3470 } 3471 } 3472 } 3473 throw new ServiceException(e); 3474 } 3475 requestCount.increment(); 3476 rpcScanRequestCount.increment(); 3477 RegionScannerHolder rsh; 3478 ScanResponse.Builder builder = ScanResponse.newBuilder(); 3479 try { 3480 if (request.hasScannerId()) { 3481 // The downstream projects such as AsyncHBase in OpenTSDB need this value. See HBASE-18000 3482 // for more details. 3483 builder.setScannerId(request.getScannerId()); 3484 rsh = getRegionScanner(request); 3485 } else { 3486 rsh = newRegionScanner(request, builder); 3487 } 3488 } catch (IOException e) { 3489 if (e == SCANNER_ALREADY_CLOSED) { 3490 // Now we will close scanner automatically if there are no more results for this region but 3491 // the old client will still send a close request to us. Just ignore it and return. 3492 return builder.build(); 3493 } 3494 throw new ServiceException(e); 3495 } 3496 HRegion region = rsh.r; 3497 String scannerName = rsh.scannerName; 3498 LeaseManager.Lease lease; 3499 try { 3500 // Remove lease while its being processed in server; protects against case 3501 // where processing of request takes > lease expiration time. 3502 lease = regionServer.getLeaseManager().removeLease(scannerName); 3503 } catch (LeaseException e) { 3504 throw new ServiceException(e); 3505 } 3506 if (request.hasRenew() && request.getRenew()) { 3507 // add back and return 3508 addScannerLeaseBack(lease); 3509 try { 3510 checkScanNextCallSeq(request, rsh); 3511 } catch (OutOfOrderScannerNextException e) { 3512 throw new ServiceException(e); 3513 } 3514 return builder.build(); 3515 } 3516 OperationQuota quota; 3517 try { 3518 quota = getRpcQuotaManager().checkQuota(region, OperationQuota.OperationType.SCAN); 3519 } catch (IOException e) { 3520 addScannerLeaseBack(lease); 3521 throw new ServiceException(e); 3522 } 3523 try { 3524 checkScanNextCallSeq(request, rsh); 3525 } catch (OutOfOrderScannerNextException e) { 3526 addScannerLeaseBack(lease); 3527 throw new ServiceException(e); 3528 } 3529 // Now we have increased the next call sequence. If we give client an error, the retry will 3530 // never success. So we'd better close the scanner and return a DoNotRetryIOException to client 3531 // and then client will try to open a new scanner. 3532 boolean closeScanner = request.hasCloseScanner() ? request.getCloseScanner() : false; 3533 int rows; // this is scan.getCaching 3534 if (request.hasNumberOfRows()) { 3535 rows = request.getNumberOfRows(); 3536 } else { 3537 rows = closeScanner ? 0 : 1; 3538 } 3539 RpcCallContext context = RpcServer.getCurrentCall().orElse(null); 3540 // now let's do the real scan. 3541 long maxQuotaResultSize = Math.min(maxScannerResultSize, quota.getReadAvailable()); 3542 RegionScanner scanner = rsh.s; 3543 // this is the limit of rows for this scan, if we the number of rows reach this value, we will 3544 // close the scanner. 3545 int limitOfRows; 3546 if (request.hasLimitOfRows()) { 3547 limitOfRows = request.getLimitOfRows(); 3548 } else { 3549 limitOfRows = -1; 3550 } 3551 MutableObject<Object> lastBlock = new MutableObject<>(); 3552 boolean scannerClosed = false; 3553 try { 3554 List<Result> results = new ArrayList<>(Math.min(rows, 512)); 3555 if (rows > 0) { 3556 boolean done = false; 3557 // Call coprocessor. Get region info from scanner. 3558 if (region.getCoprocessorHost() != null) { 3559 Boolean bypass = region.getCoprocessorHost().preScannerNext(scanner, results, rows); 3560 if (!results.isEmpty()) { 3561 for (Result r : results) { 3562 lastBlock.setValue(addSize(context, r, lastBlock.getValue())); 3563 } 3564 } 3565 if (bypass != null && bypass.booleanValue()) { 3566 done = true; 3567 } 3568 } 3569 if (!done) { 3570 scan((HBaseRpcController) controller, request, rsh, maxQuotaResultSize, rows, limitOfRows, 3571 results, builder, lastBlock, context); 3572 } else { 3573 builder.setMoreResultsInRegion(!results.isEmpty()); 3574 } 3575 } else { 3576 // This is a open scanner call with numberOfRow = 0, so set more results in region to true. 3577 builder.setMoreResultsInRegion(true); 3578 } 3579 3580 quota.addScanResult(results); 3581 addResults(builder, results, (HBaseRpcController) controller, 3582 RegionReplicaUtil.isDefaultReplica(region.getRegionInfo()), 3583 isClientCellBlockSupport(context)); 3584 if (scanner.isFilterDone() && results.isEmpty()) { 3585 // If the scanner's filter - if any - is done with the scan 3586 // only set moreResults to false if the results is empty. This is used to keep compatible 3587 // with the old scan implementation where we just ignore the returned results if moreResults 3588 // is false. Can remove the isEmpty check after we get rid of the old implementation. 3589 builder.setMoreResults(false); 3590 } 3591 // Later we may close the scanner depending on this flag so here we need to make sure that we 3592 // have already set this flag. 3593 assert builder.hasMoreResultsInRegion(); 3594 // we only set moreResults to false in the above code, so set it to true if we haven't set it 3595 // yet. 3596 if (!builder.hasMoreResults()) { 3597 builder.setMoreResults(true); 3598 } 3599 if (builder.getMoreResults() && builder.getMoreResultsInRegion() && !results.isEmpty()) { 3600 // Record the last cell of the last result if it is a partial result 3601 // We need this to calculate the complete rows we have returned to client as the 3602 // mayHaveMoreCellsInRow is true does not mean that there will be extra cells for the 3603 // current row. We may filter out all the remaining cells for the current row and just 3604 // return the cells of the nextRow when calling RegionScanner.nextRaw. So here we need to 3605 // check for row change. 3606 Result lastResult = results.get(results.size() - 1); 3607 if (lastResult.mayHaveMoreCellsInRow()) { 3608 rsh.rowOfLastPartialResult = lastResult.getRow(); 3609 } else { 3610 rsh.rowOfLastPartialResult = null; 3611 } 3612 } 3613 if (!builder.getMoreResults() || !builder.getMoreResultsInRegion() || closeScanner) { 3614 scannerClosed = true; 3615 closeScanner(region, scanner, scannerName, context); 3616 } 3617 return builder.build(); 3618 } catch (IOException e) { 3619 try { 3620 // scanner is closed here 3621 scannerClosed = true; 3622 // The scanner state might be left in a dirty state, so we will tell the Client to 3623 // fail this RPC and close the scanner while opening up another one from the start of 3624 // row that the client has last seen. 3625 closeScanner(region, scanner, scannerName, context); 3626 3627 // If it is a DoNotRetryIOException already, throw as it is. Unfortunately, DNRIOE is 3628 // used in two different semantics. 3629 // (1) The first is to close the client scanner and bubble up the exception all the way 3630 // to the application. This is preferred when the exception is really un-recoverable 3631 // (like CorruptHFileException, etc). Plain DoNotRetryIOException also falls into this 3632 // bucket usually. 3633 // (2) Second semantics is to close the current region scanner only, but continue the 3634 // client scanner by overriding the exception. This is usually UnknownScannerException, 3635 // OutOfOrderScannerNextException, etc where the region scanner has to be closed, but the 3636 // application-level ClientScanner has to continue without bubbling up the exception to 3637 // the client. See ClientScanner code to see how it deals with these special exceptions. 3638 if (e instanceof DoNotRetryIOException) { 3639 throw e; 3640 } 3641 3642 // If it is a FileNotFoundException, wrap as a 3643 // DoNotRetryIOException. This can avoid the retry in ClientScanner. 3644 if (e instanceof FileNotFoundException) { 3645 throw new DoNotRetryIOException(e); 3646 } 3647 3648 // We closed the scanner already. Instead of throwing the IOException, and client 3649 // retrying with the same scannerId only to get USE on the next RPC, we directly throw 3650 // a special exception to save an RPC. 3651 if (VersionInfoUtil.hasMinimumVersion(context.getClientVersionInfo(), 1, 4)) { 3652 // 1.4.0+ clients know how to handle 3653 throw new ScannerResetException("Scanner is closed on the server-side", e); 3654 } else { 3655 // older clients do not know about SRE. Just throw USE, which they will handle 3656 throw new UnknownScannerException("Throwing UnknownScannerException to reset the client" 3657 + " scanner state for clients older than 1.3.", e); 3658 } 3659 } catch (IOException ioe) { 3660 throw new ServiceException(ioe); 3661 } 3662 } finally { 3663 if (!scannerClosed) { 3664 // Adding resets expiration time on lease. 3665 // the closeCallBack will be set in closeScanner so here we only care about shippedCallback 3666 if (context != null) { 3667 context.setCallBack(rsh.shippedCallback); 3668 } else { 3669 // When context != null, adding back the lease will be done in callback set above. 3670 addScannerLeaseBack(lease); 3671 } 3672 } 3673 quota.close(); 3674 } 3675 } 3676 3677 private void closeScanner(HRegion region, RegionScanner scanner, String scannerName, 3678 RpcCallContext context) throws IOException { 3679 if (region.getCoprocessorHost() != null) { 3680 if (region.getCoprocessorHost().preScannerClose(scanner)) { 3681 // bypass the actual close. 3682 return; 3683 } 3684 } 3685 RegionScannerHolder rsh = scanners.remove(scannerName); 3686 if (rsh != null) { 3687 if (context != null) { 3688 context.setCallBack(rsh.closeCallBack); 3689 } else { 3690 rsh.s.close(); 3691 } 3692 if (region.getCoprocessorHost() != null) { 3693 region.getCoprocessorHost().postScannerClose(scanner); 3694 } 3695 closedScanners.put(scannerName, scannerName); 3696 } 3697 } 3698 3699 @Override 3700 public CoprocessorServiceResponse execRegionServerService(RpcController controller, 3701 CoprocessorServiceRequest request) throws ServiceException { 3702 rpcPreCheck("execRegionServerService"); 3703 return regionServer.execRegionServerService(controller, request); 3704 } 3705 3706 @Override 3707 public UpdateConfigurationResponse updateConfiguration( 3708 RpcController controller, UpdateConfigurationRequest request) 3709 throws ServiceException { 3710 try { 3711 this.regionServer.updateConfiguration(); 3712 } catch (Exception e) { 3713 throw new ServiceException(e); 3714 } 3715 return UpdateConfigurationResponse.getDefaultInstance(); 3716 } 3717 3718 @Override 3719 public GetSpaceQuotaSnapshotsResponse getSpaceQuotaSnapshots( 3720 RpcController controller, GetSpaceQuotaSnapshotsRequest request) throws ServiceException { 3721 try { 3722 final RegionServerSpaceQuotaManager manager = 3723 regionServer.getRegionServerSpaceQuotaManager(); 3724 final GetSpaceQuotaSnapshotsResponse.Builder builder = 3725 GetSpaceQuotaSnapshotsResponse.newBuilder(); 3726 if (manager != null) { 3727 final Map<TableName,SpaceQuotaSnapshot> snapshots = manager.copyQuotaSnapshots(); 3728 for (Entry<TableName,SpaceQuotaSnapshot> snapshot : snapshots.entrySet()) { 3729 builder.addSnapshots(TableQuotaSnapshot.newBuilder() 3730 .setTableName(ProtobufUtil.toProtoTableName(snapshot.getKey())) 3731 .setSnapshot(SpaceQuotaSnapshot.toProtoSnapshot(snapshot.getValue())) 3732 .build()); 3733 } 3734 } 3735 return builder.build(); 3736 } catch (Exception e) { 3737 throw new ServiceException(e); 3738 } 3739 } 3740 3741 @Override 3742 public ClearRegionBlockCacheResponse clearRegionBlockCache(RpcController controller, 3743 ClearRegionBlockCacheRequest request) { 3744 ClearRegionBlockCacheResponse.Builder builder = 3745 ClearRegionBlockCacheResponse.newBuilder(); 3746 CacheEvictionStatsBuilder stats = CacheEvictionStats.builder(); 3747 List<HRegion> regions = getRegions(request.getRegionList(), stats); 3748 for (HRegion region : regions) { 3749 try { 3750 stats = stats.append(this.regionServer.clearRegionBlockCache(region)); 3751 } catch (Exception e) { 3752 stats.addException(region.getRegionInfo().getRegionName(), e); 3753 } 3754 } 3755 stats.withMaxCacheSize(regionServer.getBlockCache().map(BlockCache::getMaxSize).orElse(0L)); 3756 return builder.setStats(ProtobufUtil.toCacheEvictionStats(stats.build())).build(); 3757 } 3758 3759 private void executeOpenRegionProcedures(OpenRegionRequest request, 3760 Map<TableName, TableDescriptor> tdCache) { 3761 long masterSystemTime = request.hasMasterSystemTime() ? request.getMasterSystemTime() : -1; 3762 for (RegionOpenInfo regionOpenInfo : request.getOpenInfoList()) { 3763 RegionInfo regionInfo = ProtobufUtil.toRegionInfo(regionOpenInfo.getRegion()); 3764 TableDescriptor tableDesc = tdCache.get(regionInfo.getTable()); 3765 if (tableDesc == null) { 3766 try { 3767 tableDesc = regionServer.getTableDescriptors().get(regionInfo.getTable()); 3768 } catch (IOException e) { 3769 // Here we do not fail the whole method since we also need deal with other 3770 // procedures, and we can not ignore this one, so we still schedule a 3771 // AssignRegionHandler and it will report back to master if we still can not get the 3772 // TableDescriptor. 3773 LOG.warn("Failed to get TableDescriptor of {}, will try again in the handler", 3774 regionInfo.getTable(), e); 3775 } 3776 } 3777 if (regionOpenInfo.getFavoredNodesCount() > 0) { 3778 regionServer.updateRegionFavoredNodesMapping(regionInfo.getEncodedName(), 3779 regionOpenInfo.getFavoredNodesList()); 3780 } 3781 long procId = regionOpenInfo.getOpenProcId(); 3782 if (regionServer.submitRegionProcedure(procId)) { 3783 regionServer.executorService.submit(AssignRegionHandler 3784 .create(regionServer, regionInfo, procId, tableDesc, 3785 masterSystemTime)); 3786 } 3787 } 3788 } 3789 3790 private void executeCloseRegionProcedures(CloseRegionRequest request) { 3791 String encodedName; 3792 try { 3793 encodedName = ProtobufUtil.getRegionEncodedName(request.getRegion()); 3794 } catch (DoNotRetryIOException e) { 3795 throw new UncheckedIOException("Should not happen", e); 3796 } 3797 ServerName destination = request.hasDestinationServer() ? 3798 ProtobufUtil.toServerName(request.getDestinationServer()) : 3799 null; 3800 long procId = request.getCloseProcId(); 3801 if (regionServer.submitRegionProcedure(procId)) { 3802 regionServer.executorService.submit(UnassignRegionHandler 3803 .create(regionServer, encodedName, procId, false, destination)); 3804 } 3805 } 3806 3807 private void executeProcedures(RemoteProcedureRequest request) { 3808 RSProcedureCallable callable; 3809 try { 3810 callable = Class.forName(request.getProcClass()).asSubclass(RSProcedureCallable.class) 3811 .getDeclaredConstructor().newInstance(); 3812 } catch (Exception e) { 3813 LOG.warn("Failed to instantiating remote procedure {}, pid={}", request.getProcClass(), 3814 request.getProcId(), e); 3815 regionServer.remoteProcedureComplete(request.getProcId(), e); 3816 return; 3817 } 3818 callable.init(request.getProcData().toByteArray(), regionServer); 3819 LOG.debug("Executing remote procedure {}, pid={}", callable.getClass(), request.getProcId()); 3820 regionServer.executeProcedure(request.getProcId(), callable); 3821 } 3822 3823 @Override 3824 @QosPriority(priority = HConstants.ADMIN_QOS) 3825 public ExecuteProceduresResponse executeProcedures(RpcController controller, 3826 ExecuteProceduresRequest request) throws ServiceException { 3827 try { 3828 checkOpen(); 3829 throwOnWrongStartCode(request); 3830 regionServer.getRegionServerCoprocessorHost().preExecuteProcedures(); 3831 if (request.getOpenRegionCount() > 0) { 3832 // Avoid reading from the TableDescritor every time(usually it will read from the file 3833 // system) 3834 Map<TableName, TableDescriptor> tdCache = new HashMap<>(); 3835 request.getOpenRegionList().forEach(req -> executeOpenRegionProcedures(req, tdCache)); 3836 } 3837 if (request.getCloseRegionCount() > 0) { 3838 request.getCloseRegionList().forEach(this::executeCloseRegionProcedures); 3839 } 3840 if (request.getProcCount() > 0) { 3841 request.getProcList().forEach(this::executeProcedures); 3842 } 3843 regionServer.getRegionServerCoprocessorHost().postExecuteProcedures(); 3844 return ExecuteProceduresResponse.getDefaultInstance(); 3845 } catch (IOException e) { 3846 throw new ServiceException(e); 3847 } 3848 } 3849 3850 @Override 3851 @QosPriority(priority = HConstants.ADMIN_QOS) 3852 public SlowLogResponses getSlowLogResponses(final RpcController controller, 3853 final SlowLogResponseRequest request) { 3854 final SlowLogRecorder slowLogRecorder = 3855 this.regionServer.getSlowLogRecorder(); 3856 final List<SlowLogPayload> slowLogPayloads; 3857 slowLogPayloads = slowLogRecorder != null 3858 ? slowLogRecorder.getSlowLogPayloads(request) 3859 : Collections.emptyList(); 3860 SlowLogResponses slowLogResponses = SlowLogResponses.newBuilder() 3861 .addAllSlowLogPayloads(slowLogPayloads) 3862 .build(); 3863 return slowLogResponses; 3864 } 3865 3866 @Override 3867 @QosPriority(priority = HConstants.ADMIN_QOS) 3868 public SlowLogResponses getLargeLogResponses(final RpcController controller, 3869 final SlowLogResponseRequest request) { 3870 final SlowLogRecorder slowLogRecorder = 3871 this.regionServer.getSlowLogRecorder(); 3872 final List<SlowLogPayload> slowLogPayloads; 3873 slowLogPayloads = slowLogRecorder != null 3874 ? slowLogRecorder.getLargeLogPayloads(request) 3875 : Collections.emptyList(); 3876 SlowLogResponses slowLogResponses = SlowLogResponses.newBuilder() 3877 .addAllSlowLogPayloads(slowLogPayloads) 3878 .build(); 3879 return slowLogResponses; 3880 } 3881 3882 @Override 3883 @QosPriority(priority = HConstants.ADMIN_QOS) 3884 public ClearSlowLogResponses clearSlowLogsResponses(final RpcController controller, 3885 final ClearSlowLogResponseRequest request) { 3886 final SlowLogRecorder slowLogRecorder = 3887 this.regionServer.getSlowLogRecorder(); 3888 boolean slowLogsCleaned = Optional.ofNullable(slowLogRecorder) 3889 .map(SlowLogRecorder::clearSlowLogPayloads).orElse(false); 3890 ClearSlowLogResponses clearSlowLogResponses = ClearSlowLogResponses.newBuilder() 3891 .setIsCleaned(slowLogsCleaned) 3892 .build(); 3893 return clearSlowLogResponses; 3894 } 3895 3896 @VisibleForTesting 3897 public RpcScheduler getRpcScheduler() { 3898 return rpcServer.getScheduler(); 3899 } 3900 3901 protected AccessChecker getAccessChecker() { 3902 return accessChecker; 3903 } 3904 3905 protected ZKPermissionWatcher getZkPermissionWatcher() { 3906 return zkPermissionWatcher; 3907 } 3908}