001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019 020import com.google.errorprone.annotations.RestrictedApi; 021import java.io.FileNotFoundException; 022import java.io.IOException; 023import java.io.UncheckedIOException; 024import java.net.BindException; 025import java.net.InetAddress; 026import java.net.InetSocketAddress; 027import java.util.ArrayList; 028import java.util.Arrays; 029import java.util.Collections; 030import java.util.HashMap; 031import java.util.Iterator; 032import java.util.List; 033import java.util.Map; 034import java.util.Map.Entry; 035import java.util.NavigableMap; 036import java.util.Set; 037import java.util.TreeSet; 038import java.util.concurrent.ConcurrentHashMap; 039import java.util.concurrent.ConcurrentMap; 040import java.util.concurrent.TimeUnit; 041import java.util.concurrent.atomic.AtomicBoolean; 042import java.util.concurrent.atomic.AtomicLong; 043import java.util.concurrent.atomic.LongAdder; 044import org.apache.hadoop.conf.Configuration; 045import org.apache.hadoop.fs.FileSystem; 046import org.apache.hadoop.fs.Path; 047import org.apache.hadoop.hbase.CacheEvictionStats; 048import org.apache.hadoop.hbase.CacheEvictionStatsBuilder; 049import org.apache.hadoop.hbase.Cell; 050import org.apache.hadoop.hbase.CellScanner; 051import org.apache.hadoop.hbase.CellUtil; 052import org.apache.hadoop.hbase.DoNotRetryIOException; 053import org.apache.hadoop.hbase.DroppedSnapshotException; 054import org.apache.hadoop.hbase.ExtendedCellScannable; 055import org.apache.hadoop.hbase.ExtendedCellScanner; 056import org.apache.hadoop.hbase.HBaseIOException; 057import org.apache.hadoop.hbase.HBaseRpcServicesBase; 058import org.apache.hadoop.hbase.HConstants; 059import org.apache.hadoop.hbase.MultiActionResultTooLarge; 060import org.apache.hadoop.hbase.NotServingRegionException; 061import org.apache.hadoop.hbase.PrivateCellUtil; 062import org.apache.hadoop.hbase.RegionTooBusyException; 063import org.apache.hadoop.hbase.Server; 064import org.apache.hadoop.hbase.ServerName; 065import org.apache.hadoop.hbase.TableName; 066import org.apache.hadoop.hbase.UnknownScannerException; 067import org.apache.hadoop.hbase.client.Append; 068import org.apache.hadoop.hbase.client.CheckAndMutate; 069import org.apache.hadoop.hbase.client.CheckAndMutateResult; 070import org.apache.hadoop.hbase.client.ClientInternalHelper; 071import org.apache.hadoop.hbase.client.Delete; 072import org.apache.hadoop.hbase.client.Durability; 073import org.apache.hadoop.hbase.client.Get; 074import org.apache.hadoop.hbase.client.Increment; 075import org.apache.hadoop.hbase.client.Mutation; 076import org.apache.hadoop.hbase.client.OperationWithAttributes; 077import org.apache.hadoop.hbase.client.Put; 078import org.apache.hadoop.hbase.client.QueryMetrics; 079import org.apache.hadoop.hbase.client.RegionInfo; 080import org.apache.hadoop.hbase.client.RegionReplicaUtil; 081import org.apache.hadoop.hbase.client.Result; 082import org.apache.hadoop.hbase.client.Row; 083import org.apache.hadoop.hbase.client.Scan; 084import org.apache.hadoop.hbase.client.TableDescriptor; 085import org.apache.hadoop.hbase.client.VersionInfoUtil; 086import org.apache.hadoop.hbase.client.metrics.ServerSideScanMetrics; 087import org.apache.hadoop.hbase.exceptions.FailedSanityCheckException; 088import org.apache.hadoop.hbase.exceptions.OutOfOrderScannerNextException; 089import org.apache.hadoop.hbase.exceptions.ScannerResetException; 090import org.apache.hadoop.hbase.exceptions.TimeoutIOException; 091import org.apache.hadoop.hbase.exceptions.UnknownProtocolException; 092import org.apache.hadoop.hbase.io.ByteBuffAllocator; 093import org.apache.hadoop.hbase.io.hfile.BlockCache; 094import org.apache.hadoop.hbase.ipc.HBaseRpcController; 095import org.apache.hadoop.hbase.ipc.PriorityFunction; 096import org.apache.hadoop.hbase.ipc.QosPriority; 097import org.apache.hadoop.hbase.ipc.RpcCall; 098import org.apache.hadoop.hbase.ipc.RpcCallContext; 099import org.apache.hadoop.hbase.ipc.RpcCallback; 100import org.apache.hadoop.hbase.ipc.RpcServer; 101import org.apache.hadoop.hbase.ipc.RpcServer.BlockingServiceAndInterface; 102import org.apache.hadoop.hbase.ipc.RpcServerFactory; 103import org.apache.hadoop.hbase.ipc.RpcServerInterface; 104import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException; 105import org.apache.hadoop.hbase.ipc.ServerRpcController; 106import org.apache.hadoop.hbase.monitoring.ThreadLocalServerSideScanMetrics; 107import org.apache.hadoop.hbase.net.Address; 108import org.apache.hadoop.hbase.procedure2.RSProcedureCallable; 109import org.apache.hadoop.hbase.quotas.ActivePolicyEnforcement; 110import org.apache.hadoop.hbase.quotas.OperationQuota; 111import org.apache.hadoop.hbase.quotas.QuotaUtil; 112import org.apache.hadoop.hbase.quotas.RegionServerRpcQuotaManager; 113import org.apache.hadoop.hbase.quotas.RegionServerSpaceQuotaManager; 114import org.apache.hadoop.hbase.quotas.SpaceQuotaSnapshot; 115import org.apache.hadoop.hbase.quotas.SpaceViolationPolicyEnforcement; 116import org.apache.hadoop.hbase.regionserver.LeaseManager.Lease; 117import org.apache.hadoop.hbase.regionserver.LeaseManager.LeaseStillHeldException; 118import org.apache.hadoop.hbase.regionserver.Region.Operation; 119import org.apache.hadoop.hbase.regionserver.ScannerContext.LimitScope; 120import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker; 121import org.apache.hadoop.hbase.regionserver.handler.AssignRegionHandler; 122import org.apache.hadoop.hbase.regionserver.handler.OpenMetaHandler; 123import org.apache.hadoop.hbase.regionserver.handler.OpenPriorityRegionHandler; 124import org.apache.hadoop.hbase.regionserver.handler.OpenRegionHandler; 125import org.apache.hadoop.hbase.regionserver.handler.UnassignRegionHandler; 126import org.apache.hadoop.hbase.replication.ReplicationUtils; 127import org.apache.hadoop.hbase.replication.regionserver.RejectReplicationRequestStateChecker; 128import org.apache.hadoop.hbase.replication.regionserver.RejectRequestsFromClientStateChecker; 129import org.apache.hadoop.hbase.security.Superusers; 130import org.apache.hadoop.hbase.security.access.Permission; 131import org.apache.hadoop.hbase.util.Bytes; 132import org.apache.hadoop.hbase.util.DNS; 133import org.apache.hadoop.hbase.util.DNS.ServerType; 134import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 135import org.apache.hadoop.hbase.util.Pair; 136import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil; 137import org.apache.hadoop.hbase.wal.WAL; 138import org.apache.hadoop.hbase.wal.WALEdit; 139import org.apache.hadoop.hbase.wal.WALKey; 140import org.apache.hadoop.hbase.wal.WALSplitUtil; 141import org.apache.hadoop.hbase.wal.WALSplitUtil.MutationReplay; 142import org.apache.hadoop.hbase.zookeeper.ZKWatcher; 143import org.apache.yetus.audience.InterfaceAudience; 144import org.slf4j.Logger; 145import org.slf4j.LoggerFactory; 146 147import org.apache.hbase.thirdparty.com.google.common.cache.Cache; 148import org.apache.hbase.thirdparty.com.google.common.cache.CacheBuilder; 149import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList; 150import org.apache.hbase.thirdparty.com.google.common.collect.Lists; 151import org.apache.hbase.thirdparty.com.google.protobuf.ByteString; 152import org.apache.hbase.thirdparty.com.google.protobuf.Message; 153import org.apache.hbase.thirdparty.com.google.protobuf.RpcController; 154import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException; 155import org.apache.hbase.thirdparty.com.google.protobuf.TextFormat; 156import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations; 157import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils; 158 159import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 160import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter; 161import org.apache.hadoop.hbase.shaded.protobuf.ResponseConverter; 162import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService; 163import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearCompactionQueuesRequest; 164import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearCompactionQueuesResponse; 165import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearRegionBlockCacheRequest; 166import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearRegionBlockCacheResponse; 167import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CloseRegionRequest; 168import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CloseRegionResponse; 169import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactRegionRequest; 170import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactRegionResponse; 171import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactionSwitchRequest; 172import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactionSwitchResponse; 173import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ExecuteProceduresRequest; 174import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ExecuteProceduresResponse; 175import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionRequest; 176import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionResponse; 177import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetCachedFilesListRequest; 178import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetCachedFilesListResponse; 179import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetOnlineRegionRequest; 180import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetOnlineRegionResponse; 181import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoRequest; 182import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoResponse; 183import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionLoadRequest; 184import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionLoadResponse; 185import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetServerInfoRequest; 186import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetServerInfoResponse; 187import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetStoreFileRequest; 188import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetStoreFileResponse; 189import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionRequest; 190import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionRequest.RegionOpenInfo; 191import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionResponse; 192import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionResponse.RegionOpeningState; 193import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.RemoteProcedureRequest; 194import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ReplicateWALEntryRequest; 195import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ReplicateWALEntryResponse; 196import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.RollWALWriterRequest; 197import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.RollWALWriterResponse; 198import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.StopServerRequest; 199import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.StopServerResponse; 200import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateFavoredNodesRequest; 201import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateFavoredNodesResponse; 202import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WALEntry; 203import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WarmupRegionRequest; 204import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WarmupRegionResponse; 205import org.apache.hadoop.hbase.shaded.protobuf.generated.BootstrapNodeProtos.BootstrapNodeService; 206import org.apache.hadoop.hbase.shaded.protobuf.generated.BootstrapNodeProtos.GetAllBootstrapNodesRequest; 207import org.apache.hadoop.hbase.shaded.protobuf.generated.BootstrapNodeProtos.GetAllBootstrapNodesResponse; 208import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos; 209import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.Action; 210import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.BulkLoadHFileRequest; 211import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.BulkLoadHFileRequest.FamilyPath; 212import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.BulkLoadHFileResponse; 213import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CleanupBulkLoadRequest; 214import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CleanupBulkLoadResponse; 215import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService; 216import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.Condition; 217import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceRequest; 218import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceResponse; 219import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.GetRequest; 220import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.GetResponse; 221import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MultiRegionLoadStats; 222import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MultiRequest; 223import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MultiResponse; 224import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateRequest; 225import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateResponse; 226import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutationProto; 227import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutationProto.MutationType; 228import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.PrepareBulkLoadRequest; 229import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.PrepareBulkLoadResponse; 230import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.RegionAction; 231import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.RegionActionResult; 232import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ResultOrException; 233import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanRequest; 234import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanResponse; 235import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos; 236import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos.RegionLoad; 237import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.NameBytesPair; 238import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.NameInt64Pair; 239import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier; 240import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType; 241import org.apache.hadoop.hbase.shaded.protobuf.generated.MapReduceProtos.ScanMetrics; 242import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsRequest; 243import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsResponse; 244import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsResponse.TableQuotaSnapshot; 245import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.ClientMetaService; 246import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.BulkLoadDescriptor; 247import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.CompactionDescriptor; 248import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.FlushDescriptor; 249import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.RegionEventDescriptor; 250 251/** 252 * Implements the regionserver RPC services. 253 */ 254@InterfaceAudience.Private 255public class RSRpcServices extends HBaseRpcServicesBase<HRegionServer> 256 implements ClientService.BlockingInterface, BootstrapNodeService.BlockingInterface { 257 258 private static final Logger LOG = LoggerFactory.getLogger(RSRpcServices.class); 259 260 /** RPC scheduler to use for the region server. */ 261 public static final String REGION_SERVER_RPC_SCHEDULER_FACTORY_CLASS = 262 "hbase.region.server.rpc.scheduler.factory.class"; 263 264 /** 265 * Minimum allowable time limit delta (in milliseconds) that can be enforced during scans. This 266 * configuration exists to prevent the scenario where a time limit is specified to be so 267 * restrictive that the time limit is reached immediately (before any cells are scanned). 268 */ 269 private static final String REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA = 270 "hbase.region.server.rpc.minimum.scan.time.limit.delta"; 271 /** 272 * Default value of {@link RSRpcServices#REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA} 273 */ 274 static final long DEFAULT_REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA = 10; 275 276 /** 277 * Whether to reject rows with size > threshold defined by 278 * {@link HConstants#BATCH_ROWS_THRESHOLD_NAME} 279 */ 280 private static final String REJECT_BATCH_ROWS_OVER_THRESHOLD = 281 "hbase.rpc.rows.size.threshold.reject"; 282 283 /** 284 * Default value of config {@link RSRpcServices#REJECT_BATCH_ROWS_OVER_THRESHOLD} 285 */ 286 private static final boolean DEFAULT_REJECT_BATCH_ROWS_OVER_THRESHOLD = false; 287 288 // Request counter. (Includes requests that are not serviced by regions.) 289 // Count only once for requests with multiple actions like multi/caching-scan/replayBatch 290 final LongAdder requestCount = new LongAdder(); 291 292 // Request counter for rpc get 293 final LongAdder rpcGetRequestCount = new LongAdder(); 294 295 // Request counter for rpc scan 296 final LongAdder rpcScanRequestCount = new LongAdder(); 297 298 // Request counter for scans that might end up in full scans 299 final LongAdder rpcFullScanRequestCount = new LongAdder(); 300 301 // Request counter for rpc multi 302 final LongAdder rpcMultiRequestCount = new LongAdder(); 303 304 // Request counter for rpc mutate 305 final LongAdder rpcMutateRequestCount = new LongAdder(); 306 307 private volatile long maxScannerResultSize; 308 309 private ScannerIdGenerator scannerIdGenerator; 310 private final ConcurrentMap<String, RegionScannerHolder> scanners = new ConcurrentHashMap<>(); 311 // Hold the name and last sequence number of a closed scanner for a while. This is used 312 // to keep compatible for old clients which may send next or close request to a region 313 // scanner which has already been exhausted. The entries will be removed automatically 314 // after scannerLeaseTimeoutPeriod. 315 private final Cache<String, Long> closedScanners; 316 /** 317 * The lease timeout period for client scanners (milliseconds). 318 */ 319 private final int scannerLeaseTimeoutPeriod; 320 321 /** 322 * The RPC timeout period (milliseconds) 323 */ 324 private final int rpcTimeout; 325 326 /** 327 * The minimum allowable delta to use for the scan limit 328 */ 329 private final long minimumScanTimeLimitDelta; 330 331 /** 332 * Row size threshold for multi requests above which a warning is logged 333 */ 334 private volatile int rowSizeWarnThreshold; 335 /* 336 * Whether we should reject requests with very high no of rows i.e. beyond threshold defined by 337 * rowSizeWarnThreshold 338 */ 339 private volatile boolean rejectRowsWithSizeOverThreshold; 340 341 final AtomicBoolean clearCompactionQueues = new AtomicBoolean(false); 342 343 /** 344 * Services launched in RSRpcServices. By default they are on but you can use the below booleans 345 * to selectively enable/disable these services (Rare is the case where you would ever turn off 346 * one or the other). 347 */ 348 public static final String REGIONSERVER_ADMIN_SERVICE_CONFIG = 349 "hbase.regionserver.admin.executorService"; 350 public static final String REGIONSERVER_CLIENT_SERVICE_CONFIG = 351 "hbase.regionserver.client.executorService"; 352 public static final String REGIONSERVER_CLIENT_META_SERVICE_CONFIG = 353 "hbase.regionserver.client.meta.executorService"; 354 public static final String REGIONSERVER_BOOTSTRAP_NODES_SERVICE_CONFIG = 355 "hbase.regionserver.bootstrap.nodes.executorService"; 356 357 /** 358 * An Rpc callback for closing a RegionScanner. 359 */ 360 private static final class RegionScannerCloseCallBack implements RpcCallback { 361 362 private final RegionScanner scanner; 363 364 public RegionScannerCloseCallBack(RegionScanner scanner) { 365 this.scanner = scanner; 366 } 367 368 @Override 369 public void run() throws IOException { 370 this.scanner.close(); 371 } 372 } 373 374 /** 375 * An Rpc callback for doing shipped() call on a RegionScanner. 376 */ 377 private class RegionScannerShippedCallBack implements RpcCallback { 378 private final String scannerName; 379 private final Shipper shipper; 380 private final Lease lease; 381 382 public RegionScannerShippedCallBack(String scannerName, Shipper shipper, Lease lease) { 383 this.scannerName = scannerName; 384 this.shipper = shipper; 385 this.lease = lease; 386 } 387 388 @Override 389 public void run() throws IOException { 390 this.shipper.shipped(); 391 // We're done. On way out re-add the above removed lease. The lease was temp removed for this 392 // Rpc call and we are at end of the call now. Time to add it back. 393 if (scanners.containsKey(scannerName)) { 394 if (lease != null) { 395 server.getLeaseManager().addLease(lease); 396 } 397 } 398 } 399 } 400 401 /** 402 * An RpcCallBack that creates a list of scanners that needs to perform callBack operation on 403 * completion of multiGets. 404 */ 405 static class RegionScannersCloseCallBack implements RpcCallback { 406 private final List<RegionScanner> scanners = new ArrayList<>(); 407 408 public void addScanner(RegionScanner scanner) { 409 this.scanners.add(scanner); 410 } 411 412 @Override 413 public void run() { 414 for (RegionScanner scanner : scanners) { 415 try { 416 scanner.close(); 417 } catch (IOException e) { 418 LOG.error("Exception while closing the scanner " + scanner, e); 419 } 420 } 421 } 422 } 423 424 static class RegionScannerContext { 425 final String scannerName; 426 final RegionScannerHolder holder; 427 final OperationQuota quota; 428 429 RegionScannerContext(String scannerName, RegionScannerHolder holder, OperationQuota quota) { 430 this.scannerName = scannerName; 431 this.holder = holder; 432 this.quota = quota; 433 } 434 } 435 436 /** 437 * Holder class which holds the RegionScanner, nextCallSeq and RpcCallbacks together. 438 */ 439 static final class RegionScannerHolder { 440 private final AtomicLong nextCallSeq = new AtomicLong(0); 441 private final RegionScanner s; 442 private final HRegion r; 443 private final RpcCallback closeCallBack; 444 private final RpcCallback shippedCallback; 445 private byte[] rowOfLastPartialResult; 446 private boolean needCursor; 447 private boolean fullRegionScan; 448 private final String clientIPAndPort; 449 private final String userName; 450 private volatile long maxBlockBytesScanned = 0; 451 private volatile long prevBlockBytesScanned = 0; 452 private volatile long prevBlockBytesScannedDifference = 0; 453 454 RegionScannerHolder(RegionScanner s, HRegion r, RpcCallback closeCallBack, 455 RpcCallback shippedCallback, boolean needCursor, boolean fullRegionScan, 456 String clientIPAndPort, String userName) { 457 this.s = s; 458 this.r = r; 459 this.closeCallBack = closeCallBack; 460 this.shippedCallback = shippedCallback; 461 this.needCursor = needCursor; 462 this.fullRegionScan = fullRegionScan; 463 this.clientIPAndPort = clientIPAndPort; 464 this.userName = userName; 465 } 466 467 long getNextCallSeq() { 468 return nextCallSeq.get(); 469 } 470 471 boolean incNextCallSeq(long currentSeq) { 472 // Use CAS to prevent multiple scan request running on the same scanner. 473 return nextCallSeq.compareAndSet(currentSeq, currentSeq + 1); 474 } 475 476 long getMaxBlockBytesScanned() { 477 return maxBlockBytesScanned; 478 } 479 480 long getPrevBlockBytesScannedDifference() { 481 return prevBlockBytesScannedDifference; 482 } 483 484 void updateBlockBytesScanned(long blockBytesScanned) { 485 prevBlockBytesScannedDifference = blockBytesScanned - prevBlockBytesScanned; 486 prevBlockBytesScanned = blockBytesScanned; 487 if (blockBytesScanned > maxBlockBytesScanned) { 488 maxBlockBytesScanned = blockBytesScanned; 489 } 490 } 491 492 // Should be called only when we need to print lease expired messages otherwise 493 // cache the String once made. 494 @Override 495 public String toString() { 496 return "clientIPAndPort=" + this.clientIPAndPort + ", userName=" + this.userName 497 + ", regionInfo=" + this.r.getRegionInfo().getRegionNameAsString(); 498 } 499 } 500 501 /** 502 * Instantiated as a scanner lease. If the lease times out, the scanner is closed 503 */ 504 private class ScannerListener implements LeaseListener { 505 private final String scannerName; 506 507 ScannerListener(final String n) { 508 this.scannerName = n; 509 } 510 511 @Override 512 public void leaseExpired() { 513 RegionScannerHolder rsh = scanners.remove(this.scannerName); 514 if (rsh == null) { 515 LOG.warn("Scanner lease {} expired but no outstanding scanner", this.scannerName); 516 return; 517 } 518 LOG.info("Scanner lease {} expired {}", this.scannerName, rsh); 519 server.getMetrics().incrScannerLeaseExpired(); 520 RegionScanner s = rsh.s; 521 HRegion region = null; 522 try { 523 region = server.getRegion(s.getRegionInfo().getRegionName()); 524 if (region != null && region.getCoprocessorHost() != null) { 525 region.getCoprocessorHost().preScannerClose(s); 526 } 527 } catch (IOException e) { 528 LOG.error("Closing scanner {} {}", this.scannerName, rsh, e); 529 } finally { 530 try { 531 s.close(); 532 if (region != null && region.getCoprocessorHost() != null) { 533 region.getCoprocessorHost().postScannerClose(s); 534 } 535 } catch (IOException e) { 536 LOG.error("Closing scanner {} {}", this.scannerName, rsh, e); 537 } 538 } 539 } 540 } 541 542 private static ResultOrException getResultOrException(final ClientProtos.Result r, 543 final int index) { 544 return getResultOrException(ResponseConverter.buildActionResult(r), index); 545 } 546 547 private static ResultOrException getResultOrException(final Exception e, final int index) { 548 return getResultOrException(ResponseConverter.buildActionResult(e), index); 549 } 550 551 private static ResultOrException getResultOrException(final ResultOrException.Builder builder, 552 final int index) { 553 return builder.setIndex(index).build(); 554 } 555 556 /** 557 * Checks for the following pre-checks in order: 558 * <ol> 559 * <li>RegionServer is running</li> 560 * <li>If authorization is enabled, then RPC caller has ADMIN permissions</li> 561 * </ol> 562 * @param requestName name of rpc request. Used in reporting failures to provide context. 563 * @throws ServiceException If any of the above listed pre-check fails. 564 */ 565 private void rpcPreCheck(String requestName) throws ServiceException { 566 try { 567 checkOpen(); 568 requirePermission(requestName, Permission.Action.ADMIN); 569 } catch (IOException ioe) { 570 throw new ServiceException(ioe); 571 } 572 } 573 574 private boolean isClientCellBlockSupport(RpcCallContext context) { 575 return context != null && context.isClientCellBlockSupported(); 576 } 577 578 private void addResult(final MutateResponse.Builder builder, final Result result, 579 final HBaseRpcController rpcc, boolean clientCellBlockSupported) { 580 if (result == null) return; 581 if (clientCellBlockSupported) { 582 builder.setResult(ProtobufUtil.toResultNoData(result)); 583 rpcc.setCellScanner(result.cellScanner()); 584 } else { 585 ClientProtos.Result pbr = ProtobufUtil.toResult(result); 586 builder.setResult(pbr); 587 } 588 } 589 590 private void addResults(ScanResponse.Builder builder, List<Result> results, 591 HBaseRpcController controller, boolean isDefaultRegion, boolean clientCellBlockSupported) { 592 builder.setStale(!isDefaultRegion); 593 if (results.isEmpty()) { 594 return; 595 } 596 if (clientCellBlockSupported) { 597 for (Result res : results) { 598 builder.addCellsPerResult(res.size()); 599 builder.addPartialFlagPerResult(res.mayHaveMoreCellsInRow()); 600 } 601 controller.setCellScanner(PrivateCellUtil.createExtendedCellScanner(results)); 602 } else { 603 for (Result res : results) { 604 ClientProtos.Result pbr = ProtobufUtil.toResult(res); 605 builder.addResults(pbr); 606 } 607 } 608 } 609 610 private CheckAndMutateResult checkAndMutate(HRegion region, List<ClientProtos.Action> actions, 611 CellScanner cellScanner, Condition condition, long nonceGroup, 612 ActivePolicyEnforcement spaceQuotaEnforcement) throws IOException { 613 int countOfCompleteMutation = 0; 614 try { 615 if (!region.getRegionInfo().isMetaRegion()) { 616 server.getMemStoreFlusher().reclaimMemStoreMemory(); 617 } 618 List<Mutation> mutations = new ArrayList<>(); 619 long nonce = HConstants.NO_NONCE; 620 for (ClientProtos.Action action : actions) { 621 if (action.hasGet()) { 622 throw new DoNotRetryIOException( 623 "Atomic put and/or delete only, not a Get=" + action.getGet()); 624 } 625 MutationProto mutation = action.getMutation(); 626 MutationType type = mutation.getMutateType(); 627 switch (type) { 628 case PUT: 629 Put put = ProtobufUtil.toPut(mutation, cellScanner); 630 ++countOfCompleteMutation; 631 checkCellSizeLimit(region, put); 632 spaceQuotaEnforcement.getPolicyEnforcement(region).check(put); 633 mutations.add(put); 634 break; 635 case DELETE: 636 Delete del = ProtobufUtil.toDelete(mutation, cellScanner); 637 ++countOfCompleteMutation; 638 spaceQuotaEnforcement.getPolicyEnforcement(region).check(del); 639 mutations.add(del); 640 break; 641 case INCREMENT: 642 Increment increment = ProtobufUtil.toIncrement(mutation, cellScanner); 643 nonce = mutation.hasNonce() ? mutation.getNonce() : HConstants.NO_NONCE; 644 ++countOfCompleteMutation; 645 checkCellSizeLimit(region, increment); 646 spaceQuotaEnforcement.getPolicyEnforcement(region).check(increment); 647 mutations.add(increment); 648 break; 649 case APPEND: 650 Append append = ProtobufUtil.toAppend(mutation, cellScanner); 651 nonce = mutation.hasNonce() ? mutation.getNonce() : HConstants.NO_NONCE; 652 ++countOfCompleteMutation; 653 checkCellSizeLimit(region, append); 654 spaceQuotaEnforcement.getPolicyEnforcement(region).check(append); 655 mutations.add(append); 656 break; 657 default: 658 throw new DoNotRetryIOException("invalid mutation type : " + type); 659 } 660 } 661 662 if (mutations.size() == 0) { 663 return new CheckAndMutateResult(true, null); 664 } else { 665 CheckAndMutate checkAndMutate = ProtobufUtil.toCheckAndMutate(condition, mutations); 666 CheckAndMutateResult result = null; 667 if (region.getCoprocessorHost() != null) { 668 result = region.getCoprocessorHost().preCheckAndMutate(checkAndMutate); 669 } 670 if (result == null) { 671 result = region.checkAndMutate(checkAndMutate, nonceGroup, nonce); 672 if (region.getCoprocessorHost() != null) { 673 result = region.getCoprocessorHost().postCheckAndMutate(checkAndMutate, result); 674 } 675 } 676 return result; 677 } 678 } finally { 679 // Currently, the checkAndMutate isn't supported by batch so it won't mess up the cell scanner 680 // even if the malformed cells are not skipped. 681 for (int i = countOfCompleteMutation; i < actions.size(); ++i) { 682 skipCellsForMutation(actions.get(i), cellScanner); 683 } 684 } 685 } 686 687 /** 688 * Execute an append mutation. 689 * @return result to return to client if default operation should be bypassed as indicated by 690 * RegionObserver, null otherwise 691 */ 692 private Result append(final HRegion region, final OperationQuota quota, 693 final MutationProto mutation, final CellScanner cellScanner, long nonceGroup, 694 ActivePolicyEnforcement spaceQuota, RpcCallContext context) throws IOException { 695 long before = EnvironmentEdgeManager.currentTime(); 696 Append append = ProtobufUtil.toAppend(mutation, cellScanner); 697 checkCellSizeLimit(region, append); 698 spaceQuota.getPolicyEnforcement(region).check(append); 699 quota.addMutation(append); 700 long blockBytesScannedBefore = context != null ? context.getBlockBytesScanned() : 0; 701 long nonce = mutation.hasNonce() ? mutation.getNonce() : HConstants.NO_NONCE; 702 Result r = region.append(append, nonceGroup, nonce); 703 if (server.getMetrics() != null) { 704 long blockBytesScanned = 705 context != null ? context.getBlockBytesScanned() - blockBytesScannedBefore : 0; 706 server.getMetrics().updateAppend(region, EnvironmentEdgeManager.currentTime() - before, 707 blockBytesScanned); 708 } 709 return r == null ? Result.EMPTY_RESULT : r; 710 } 711 712 /** 713 * Execute an increment mutation. 714 */ 715 private Result increment(final HRegion region, final OperationQuota quota, 716 final MutationProto mutation, final CellScanner cells, long nonceGroup, 717 ActivePolicyEnforcement spaceQuota, RpcCallContext context) throws IOException { 718 long before = EnvironmentEdgeManager.currentTime(); 719 Increment increment = ProtobufUtil.toIncrement(mutation, cells); 720 checkCellSizeLimit(region, increment); 721 spaceQuota.getPolicyEnforcement(region).check(increment); 722 quota.addMutation(increment); 723 long blockBytesScannedBefore = context != null ? context.getBlockBytesScanned() : 0; 724 long nonce = mutation.hasNonce() ? mutation.getNonce() : HConstants.NO_NONCE; 725 Result r = region.increment(increment, nonceGroup, nonce); 726 final MetricsRegionServer metricsRegionServer = server.getMetrics(); 727 if (metricsRegionServer != null) { 728 long blockBytesScanned = 729 context != null ? context.getBlockBytesScanned() - blockBytesScannedBefore : 0; 730 metricsRegionServer.updateIncrement(region, EnvironmentEdgeManager.currentTime() - before, 731 blockBytesScanned); 732 } 733 return r == null ? Result.EMPTY_RESULT : r; 734 } 735 736 /** 737 * Run through the regionMutation <code>rm</code> and per Mutation, do the work, and then when 738 * done, add an instance of a {@link ResultOrException} that corresponds to each Mutation. 739 * @param cellsToReturn Could be null. May be allocated in this method. This is what this method 740 * returns as a 'result'. 741 * @param closeCallBack the callback to be used with multigets 742 * @param context the current RpcCallContext 743 * @return Return the <code>cellScanner</code> passed 744 */ 745 private List<ExtendedCellScannable> doNonAtomicRegionMutation(final HRegion region, 746 final OperationQuota quota, final RegionAction actions, final CellScanner cellScanner, 747 final RegionActionResult.Builder builder, List<ExtendedCellScannable> cellsToReturn, 748 long nonceGroup, final RegionScannersCloseCallBack closeCallBack, RpcCallContext context, 749 ActivePolicyEnforcement spaceQuotaEnforcement) { 750 // Gather up CONTIGUOUS Puts and Deletes in this mutations List. Idea is that rather than do 751 // one at a time, we instead pass them in batch. Be aware that the corresponding 752 // ResultOrException instance that matches each Put or Delete is then added down in the 753 // doNonAtomicBatchOp call. We should be staying aligned though the Put and Delete are 754 // deferred/batched 755 List<ClientProtos.Action> mutations = null; 756 long maxQuotaResultSize = Math.min(maxScannerResultSize, quota.getMaxResultSize()); 757 IOException sizeIOE = null; 758 ClientProtos.ResultOrException.Builder resultOrExceptionBuilder = 759 ResultOrException.newBuilder(); 760 boolean hasResultOrException = false; 761 for (ClientProtos.Action action : actions.getActionList()) { 762 hasResultOrException = false; 763 resultOrExceptionBuilder.clear(); 764 try { 765 Result r = null; 766 long blockBytesScannedBefore = context != null ? context.getBlockBytesScanned() : 0; 767 if ( 768 context != null && context.isRetryImmediatelySupported() 769 && (context.getResponseCellSize() > maxQuotaResultSize 770 || blockBytesScannedBefore + context.getResponseExceptionSize() > maxQuotaResultSize) 771 ) { 772 773 // We're storing the exception since the exception and reason string won't 774 // change after the response size limit is reached. 775 if (sizeIOE == null) { 776 // We don't need the stack un-winding do don't throw the exception. 777 // Throwing will kill the JVM's JIT. 778 // 779 // Instead just create the exception and then store it. 780 sizeIOE = new MultiActionResultTooLarge("Max size exceeded" + " CellSize: " 781 + context.getResponseCellSize() + " BlockSize: " + blockBytesScannedBefore); 782 783 // Only report the exception once since there's only one request that 784 // caused the exception. Otherwise this number will dominate the exceptions count. 785 rpcServer.getMetrics().exception(sizeIOE); 786 } 787 788 // Now that there's an exception is known to be created 789 // use it for the response. 790 // 791 // This will create a copy in the builder. 792 NameBytesPair pair = ResponseConverter.buildException(sizeIOE); 793 resultOrExceptionBuilder.setException(pair); 794 context.incrementResponseExceptionSize(pair.getSerializedSize()); 795 resultOrExceptionBuilder.setIndex(action.getIndex()); 796 builder.addResultOrException(resultOrExceptionBuilder.build()); 797 skipCellsForMutation(action, cellScanner); 798 continue; 799 } 800 if (action.hasGet()) { 801 long before = EnvironmentEdgeManager.currentTime(); 802 ClientProtos.Get pbGet = action.getGet(); 803 // An asynchbase client, https://github.com/OpenTSDB/asynchbase, starts by trying to do 804 // a get closest before. Throwing the UnknownProtocolException signals it that it needs 805 // to switch and do hbase2 protocol (HBase servers do not tell clients what versions 806 // they are; its a problem for non-native clients like asynchbase. HBASE-20225. 807 if (pbGet.hasClosestRowBefore() && pbGet.getClosestRowBefore()) { 808 throw new UnknownProtocolException("Is this a pre-hbase-1.0.0 or asynchbase client? " 809 + "Client is invoking getClosestRowBefore removed in hbase-2.0.0 replaced by " 810 + "reverse Scan."); 811 } 812 try { 813 Get get = ProtobufUtil.toGet(pbGet); 814 if (context != null) { 815 r = get(get, (region), closeCallBack, context); 816 } else { 817 r = region.get(get); 818 } 819 } finally { 820 final MetricsRegionServer metricsRegionServer = server.getMetrics(); 821 if (metricsRegionServer != null) { 822 long blockBytesScanned = 823 context != null ? context.getBlockBytesScanned() - blockBytesScannedBefore : 0; 824 metricsRegionServer.updateGet(region, EnvironmentEdgeManager.currentTime() - before, 825 blockBytesScanned); 826 } 827 } 828 } else if (action.hasServiceCall()) { 829 hasResultOrException = true; 830 Message result = execServiceOnRegion(region, action.getServiceCall()); 831 ClientProtos.CoprocessorServiceResult.Builder serviceResultBuilder = 832 ClientProtos.CoprocessorServiceResult.newBuilder(); 833 resultOrExceptionBuilder.setServiceResult(serviceResultBuilder 834 .setValue(serviceResultBuilder.getValueBuilder().setName(result.getClass().getName()) 835 // TODO: Copy!!! 836 .setValue(UnsafeByteOperations.unsafeWrap(result.toByteArray())))); 837 } else if (action.hasMutation()) { 838 MutationType type = action.getMutation().getMutateType(); 839 if ( 840 type != MutationType.PUT && type != MutationType.DELETE && mutations != null 841 && !mutations.isEmpty() 842 ) { 843 // Flush out any Puts or Deletes already collected. 844 doNonAtomicBatchOp(builder, region, quota, mutations, cellScanner, 845 spaceQuotaEnforcement); 846 mutations.clear(); 847 } 848 switch (type) { 849 case APPEND: 850 r = append(region, quota, action.getMutation(), cellScanner, nonceGroup, 851 spaceQuotaEnforcement, context); 852 break; 853 case INCREMENT: 854 r = increment(region, quota, action.getMutation(), cellScanner, nonceGroup, 855 spaceQuotaEnforcement, context); 856 break; 857 case PUT: 858 case DELETE: 859 // Collect the individual mutations and apply in a batch 860 if (mutations == null) { 861 mutations = new ArrayList<>(actions.getActionCount()); 862 } 863 mutations.add(action); 864 break; 865 default: 866 throw new DoNotRetryIOException("Unsupported mutate type: " + type.name()); 867 } 868 } else { 869 throw new HBaseIOException("Unexpected Action type"); 870 } 871 if (r != null) { 872 ClientProtos.Result pbResult = null; 873 if (isClientCellBlockSupport(context)) { 874 pbResult = ProtobufUtil.toResultNoData(r); 875 // Hard to guess the size here. Just make a rough guess. 876 if (cellsToReturn == null) { 877 cellsToReturn = new ArrayList<>(); 878 } 879 cellsToReturn.add(r); 880 } else { 881 pbResult = ProtobufUtil.toResult(r); 882 } 883 addSize(context, r); 884 hasResultOrException = true; 885 resultOrExceptionBuilder.setResult(pbResult); 886 } 887 // Could get to here and there was no result and no exception. Presumes we added 888 // a Put or Delete to the collecting Mutations List for adding later. In this 889 // case the corresponding ResultOrException instance for the Put or Delete will be added 890 // down in the doNonAtomicBatchOp method call rather than up here. 891 } catch (IOException ie) { 892 rpcServer.getMetrics().exception(ie); 893 hasResultOrException = true; 894 NameBytesPair pair = ResponseConverter.buildException(ie); 895 resultOrExceptionBuilder.setException(pair); 896 context.incrementResponseExceptionSize(pair.getSerializedSize()); 897 } 898 if (hasResultOrException) { 899 // Propagate index. 900 resultOrExceptionBuilder.setIndex(action.getIndex()); 901 builder.addResultOrException(resultOrExceptionBuilder.build()); 902 } 903 } 904 // Finish up any outstanding mutations 905 if (!CollectionUtils.isEmpty(mutations)) { 906 doNonAtomicBatchOp(builder, region, quota, mutations, cellScanner, spaceQuotaEnforcement); 907 } 908 return cellsToReturn; 909 } 910 911 private void checkCellSizeLimit(final HRegion r, final Mutation m) throws IOException { 912 if (r.maxCellSize > 0) { 913 CellScanner cells = m.cellScanner(); 914 while (cells.advance()) { 915 int size = PrivateCellUtil.estimatedSerializedSizeOf(cells.current()); 916 if (size > r.maxCellSize) { 917 String msg = "Cell[" + cells.current() + "] with size " + size + " exceeds limit of " 918 + r.maxCellSize + " bytes"; 919 LOG.debug(msg); 920 throw new DoNotRetryIOException(msg); 921 } 922 } 923 } 924 } 925 926 private void doAtomicBatchOp(final RegionActionResult.Builder builder, final HRegion region, 927 final OperationQuota quota, final List<ClientProtos.Action> mutations, final CellScanner cells, 928 long nonceGroup, ActivePolicyEnforcement spaceQuotaEnforcement) throws IOException { 929 // Just throw the exception. The exception will be caught and then added to region-level 930 // exception for RegionAction. Leaving the null to action result is ok since the null 931 // result is viewed as failure by hbase client. And the region-lever exception will be used 932 // to replaced the null result. see AsyncRequestFutureImpl#receiveMultiAction and 933 // AsyncBatchRpcRetryingCaller#onComplete for more details. 934 doBatchOp(builder, region, quota, mutations, cells, nonceGroup, spaceQuotaEnforcement, true); 935 } 936 937 private void doNonAtomicBatchOp(final RegionActionResult.Builder builder, final HRegion region, 938 final OperationQuota quota, final List<ClientProtos.Action> mutations, final CellScanner cells, 939 ActivePolicyEnforcement spaceQuotaEnforcement) { 940 try { 941 doBatchOp(builder, region, quota, mutations, cells, HConstants.NO_NONCE, 942 spaceQuotaEnforcement, false); 943 } catch (IOException e) { 944 // Set the exception for each action. The mutations in same RegionAction are group to 945 // different batch and then be processed individually. Hence, we don't set the region-level 946 // exception here for whole RegionAction. 947 for (Action mutation : mutations) { 948 builder.addResultOrException(getResultOrException(e, mutation.getIndex())); 949 } 950 } 951 } 952 953 /** 954 * Execute a list of mutations. 955 */ 956 private void doBatchOp(final RegionActionResult.Builder builder, final HRegion region, 957 final OperationQuota quota, final List<ClientProtos.Action> mutations, final CellScanner cells, 958 long nonceGroup, ActivePolicyEnforcement spaceQuotaEnforcement, boolean atomic) 959 throws IOException { 960 Mutation[] mArray = new Mutation[mutations.size()]; 961 long before = EnvironmentEdgeManager.currentTime(); 962 boolean batchContainsPuts = false, batchContainsDelete = false; 963 try { 964 /** 965 * HBASE-17924 mutationActionMap is a map to map the relation between mutations and actions 966 * since mutation array may have been reoredered.In order to return the right result or 967 * exception to the corresponding actions, We need to know which action is the mutation belong 968 * to. We can't sort ClientProtos.Action array, since they are bonded to cellscanners. 969 */ 970 Map<Mutation, ClientProtos.Action> mutationActionMap = new HashMap<>(); 971 int i = 0; 972 long nonce = HConstants.NO_NONCE; 973 for (ClientProtos.Action action : mutations) { 974 if (action.hasGet()) { 975 throw new DoNotRetryIOException( 976 "Atomic put and/or delete only, not a Get=" + action.getGet()); 977 } 978 MutationProto m = action.getMutation(); 979 Mutation mutation; 980 switch (m.getMutateType()) { 981 case PUT: 982 mutation = ProtobufUtil.toPut(m, cells); 983 batchContainsPuts = true; 984 break; 985 986 case DELETE: 987 mutation = ProtobufUtil.toDelete(m, cells); 988 batchContainsDelete = true; 989 break; 990 991 case INCREMENT: 992 mutation = ProtobufUtil.toIncrement(m, cells); 993 nonce = m.hasNonce() ? m.getNonce() : HConstants.NO_NONCE; 994 break; 995 996 case APPEND: 997 mutation = ProtobufUtil.toAppend(m, cells); 998 nonce = m.hasNonce() ? m.getNonce() : HConstants.NO_NONCE; 999 break; 1000 1001 default: 1002 throw new DoNotRetryIOException("Invalid mutation type : " + m.getMutateType()); 1003 } 1004 mutationActionMap.put(mutation, action); 1005 mArray[i++] = mutation; 1006 checkCellSizeLimit(region, mutation); 1007 // Check if a space quota disallows this mutation 1008 spaceQuotaEnforcement.getPolicyEnforcement(region).check(mutation); 1009 quota.addMutation(mutation); 1010 } 1011 1012 if (!region.getRegionInfo().isMetaRegion()) { 1013 server.getMemStoreFlusher().reclaimMemStoreMemory(); 1014 } 1015 1016 // HBASE-17924 1017 // Sort to improve lock efficiency for non-atomic batch of operations. If atomic 1018 // order is preserved as its expected from the client 1019 if (!atomic) { 1020 Arrays.sort(mArray, (v1, v2) -> Row.COMPARATOR.compare(v1, v2)); 1021 } 1022 1023 OperationStatus[] codes = region.batchMutate(mArray, atomic, nonceGroup, nonce); 1024 1025 // When atomic is true, it indicates that the mutateRow API or the batch API with 1026 // RowMutations is called. In this case, we need to merge the results of the 1027 // Increment/Append operations if the mutations include those operations, and set the merged 1028 // result to the first element of the ResultOrException list 1029 if (atomic) { 1030 List<ResultOrException> resultOrExceptions = new ArrayList<>(); 1031 List<Result> results = new ArrayList<>(); 1032 for (i = 0; i < codes.length; i++) { 1033 if (codes[i].getResult() != null) { 1034 results.add(codes[i].getResult()); 1035 } 1036 if (i != 0) { 1037 resultOrExceptions 1038 .add(getResultOrException(ClientProtos.Result.getDefaultInstance(), i)); 1039 } 1040 } 1041 1042 if (results.isEmpty()) { 1043 builder.addResultOrException( 1044 getResultOrException(ClientProtos.Result.getDefaultInstance(), 0)); 1045 } else { 1046 // Merge the results of the Increment/Append operations 1047 List<Cell> cellList = new ArrayList<>(); 1048 for (Result result : results) { 1049 if (result.rawCells() != null) { 1050 cellList.addAll(Arrays.asList(result.rawCells())); 1051 } 1052 } 1053 Result result = Result.create(cellList); 1054 1055 // Set the merged result of the Increment/Append operations to the first element of the 1056 // ResultOrException list 1057 builder.addResultOrException(getResultOrException(ProtobufUtil.toResult(result), 0)); 1058 } 1059 1060 builder.addAllResultOrException(resultOrExceptions); 1061 return; 1062 } 1063 1064 for (i = 0; i < codes.length; i++) { 1065 Mutation currentMutation = mArray[i]; 1066 ClientProtos.Action currentAction = mutationActionMap.get(currentMutation); 1067 int index = currentAction.hasIndex() ? currentAction.getIndex() : i; 1068 Exception e; 1069 switch (codes[i].getOperationStatusCode()) { 1070 case BAD_FAMILY: 1071 e = new NoSuchColumnFamilyException(codes[i].getExceptionMsg()); 1072 builder.addResultOrException(getResultOrException(e, index)); 1073 break; 1074 1075 case SANITY_CHECK_FAILURE: 1076 e = new FailedSanityCheckException(codes[i].getExceptionMsg()); 1077 builder.addResultOrException(getResultOrException(e, index)); 1078 break; 1079 1080 default: 1081 e = new DoNotRetryIOException(codes[i].getExceptionMsg()); 1082 builder.addResultOrException(getResultOrException(e, index)); 1083 break; 1084 1085 case SUCCESS: 1086 ClientProtos.Result result = codes[i].getResult() == null 1087 ? ClientProtos.Result.getDefaultInstance() 1088 : ProtobufUtil.toResult(codes[i].getResult()); 1089 builder.addResultOrException(getResultOrException(result, index)); 1090 break; 1091 1092 case STORE_TOO_BUSY: 1093 e = new RegionTooBusyException(codes[i].getExceptionMsg()); 1094 builder.addResultOrException(getResultOrException(e, index)); 1095 break; 1096 } 1097 } 1098 } finally { 1099 int processedMutationIndex = 0; 1100 for (Action mutation : mutations) { 1101 // The non-null mArray[i] means the cell scanner has been read. 1102 if (mArray[processedMutationIndex++] == null) { 1103 skipCellsForMutation(mutation, cells); 1104 } 1105 } 1106 updateMutationMetrics(region, before, batchContainsPuts, batchContainsDelete); 1107 } 1108 } 1109 1110 private void updateMutationMetrics(HRegion region, long starttime, boolean batchContainsPuts, 1111 boolean batchContainsDelete) { 1112 final MetricsRegionServer metricsRegionServer = server.getMetrics(); 1113 if (metricsRegionServer != null) { 1114 long after = EnvironmentEdgeManager.currentTime(); 1115 if (batchContainsPuts) { 1116 metricsRegionServer.updatePutBatch(region, after - starttime); 1117 } 1118 if (batchContainsDelete) { 1119 metricsRegionServer.updateDeleteBatch(region, after - starttime); 1120 } 1121 } 1122 } 1123 1124 /** 1125 * Execute a list of Put/Delete mutations. The function returns OperationStatus instead of 1126 * constructing MultiResponse to save a possible loop if caller doesn't need MultiResponse. 1127 * @return an array of OperationStatus which internally contains the OperationStatusCode and the 1128 * exceptionMessage if any 1129 * @deprecated Since 3.0.0, will be removed in 4.0.0. We do not use this method for replaying 1130 * edits for secondary replicas any more, see 1131 * {@link #replicateToReplica(RpcController, ReplicateWALEntryRequest)}. 1132 */ 1133 @Deprecated 1134 private OperationStatus[] doReplayBatchOp(final HRegion region, 1135 final List<MutationReplay> mutations, long replaySeqId) throws IOException { 1136 long before = EnvironmentEdgeManager.currentTime(); 1137 boolean batchContainsPuts = false, batchContainsDelete = false; 1138 try { 1139 for (Iterator<MutationReplay> it = mutations.iterator(); it.hasNext();) { 1140 MutationReplay m = it.next(); 1141 1142 if (m.getType() == MutationType.PUT) { 1143 batchContainsPuts = true; 1144 } else { 1145 batchContainsDelete = true; 1146 } 1147 1148 NavigableMap<byte[], List<Cell>> map = m.mutation.getFamilyCellMap(); 1149 List<Cell> metaCells = map.get(WALEdit.METAFAMILY); 1150 if (metaCells != null && !metaCells.isEmpty()) { 1151 for (Cell metaCell : metaCells) { 1152 CompactionDescriptor compactionDesc = WALEdit.getCompaction(metaCell); 1153 boolean isDefaultReplica = RegionReplicaUtil.isDefaultReplica(region.getRegionInfo()); 1154 HRegion hRegion = region; 1155 if (compactionDesc != null) { 1156 // replay the compaction. Remove the files from stores only if we are the primary 1157 // region replica (thus own the files) 1158 hRegion.replayWALCompactionMarker(compactionDesc, !isDefaultReplica, isDefaultReplica, 1159 replaySeqId); 1160 continue; 1161 } 1162 FlushDescriptor flushDesc = WALEdit.getFlushDescriptor(metaCell); 1163 if (flushDesc != null && !isDefaultReplica) { 1164 hRegion.replayWALFlushMarker(flushDesc, replaySeqId); 1165 continue; 1166 } 1167 RegionEventDescriptor regionEvent = WALEdit.getRegionEventDescriptor(metaCell); 1168 if (regionEvent != null && !isDefaultReplica) { 1169 hRegion.replayWALRegionEventMarker(regionEvent); 1170 continue; 1171 } 1172 BulkLoadDescriptor bulkLoadEvent = WALEdit.getBulkLoadDescriptor(metaCell); 1173 if (bulkLoadEvent != null) { 1174 hRegion.replayWALBulkLoadEventMarker(bulkLoadEvent); 1175 continue; 1176 } 1177 } 1178 it.remove(); 1179 } 1180 } 1181 requestCount.increment(); 1182 if (!region.getRegionInfo().isMetaRegion()) { 1183 server.getMemStoreFlusher().reclaimMemStoreMemory(); 1184 } 1185 return region.batchReplay(mutations.toArray(new MutationReplay[mutations.size()]), 1186 replaySeqId); 1187 } finally { 1188 updateMutationMetrics(region, before, batchContainsPuts, batchContainsDelete); 1189 } 1190 } 1191 1192 private void closeAllScanners() { 1193 // Close any outstanding scanners. Means they'll get an UnknownScanner 1194 // exception next time they come in. 1195 for (Map.Entry<String, RegionScannerHolder> e : scanners.entrySet()) { 1196 try { 1197 e.getValue().s.close(); 1198 } catch (IOException ioe) { 1199 LOG.warn("Closing scanner " + e.getKey(), ioe); 1200 } 1201 } 1202 } 1203 1204 // Directly invoked only for testing 1205 public RSRpcServices(final HRegionServer rs) throws IOException { 1206 super(rs, rs.getProcessName()); 1207 final Configuration conf = rs.getConfiguration(); 1208 setReloadableGuardrails(conf); 1209 scannerLeaseTimeoutPeriod = conf.getInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, 1210 HConstants.DEFAULT_HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD); 1211 rpcTimeout = 1212 conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY, HConstants.DEFAULT_HBASE_RPC_TIMEOUT); 1213 minimumScanTimeLimitDelta = conf.getLong(REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA, 1214 DEFAULT_REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA); 1215 rpcServer.setNamedQueueRecorder(rs.getNamedQueueRecorder()); 1216 closedScanners = CacheBuilder.newBuilder() 1217 .expireAfterAccess(scannerLeaseTimeoutPeriod, TimeUnit.MILLISECONDS).build(); 1218 } 1219 1220 @Override 1221 protected boolean defaultReservoirEnabled() { 1222 return true; 1223 } 1224 1225 @Override 1226 protected ServerType getDNSServerType() { 1227 return DNS.ServerType.REGIONSERVER; 1228 } 1229 1230 @Override 1231 protected String getHostname(Configuration conf, String defaultHostname) { 1232 return conf.get("hbase.regionserver.ipc.address", defaultHostname); 1233 } 1234 1235 @Override 1236 protected String getPortConfigName() { 1237 return HConstants.REGIONSERVER_PORT; 1238 } 1239 1240 @Override 1241 protected int getDefaultPort() { 1242 return HConstants.DEFAULT_REGIONSERVER_PORT; 1243 } 1244 1245 @Override 1246 protected Class<?> getRpcSchedulerFactoryClass(Configuration conf) { 1247 return conf.getClass(REGION_SERVER_RPC_SCHEDULER_FACTORY_CLASS, 1248 SimpleRpcSchedulerFactory.class); 1249 } 1250 1251 protected RpcServerInterface createRpcServer(final Server server, 1252 final RpcSchedulerFactory rpcSchedulerFactory, final InetSocketAddress bindAddress, 1253 final String name) throws IOException { 1254 final Configuration conf = server.getConfiguration(); 1255 boolean reservoirEnabled = conf.getBoolean(ByteBuffAllocator.ALLOCATOR_POOL_ENABLED_KEY, true); 1256 try { 1257 return RpcServerFactory.createRpcServer(server, name, getServices(), bindAddress, // use final 1258 // bindAddress 1259 // for this 1260 // server. 1261 conf, rpcSchedulerFactory.create(conf, this, server), reservoirEnabled); 1262 } catch (BindException be) { 1263 throw new IOException(be.getMessage() + ". To switch ports use the '" 1264 + HConstants.REGIONSERVER_PORT + "' configuration property.", 1265 be.getCause() != null ? be.getCause() : be); 1266 } 1267 } 1268 1269 protected Class<?> getRpcSchedulerFactoryClass() { 1270 final Configuration conf = server.getConfiguration(); 1271 return conf.getClass(REGION_SERVER_RPC_SCHEDULER_FACTORY_CLASS, 1272 SimpleRpcSchedulerFactory.class); 1273 } 1274 1275 protected PriorityFunction createPriority() { 1276 return new RSAnnotationReadingPriorityFunction(this); 1277 } 1278 1279 public int getScannersCount() { 1280 return scanners.size(); 1281 } 1282 1283 /** Returns The outstanding RegionScanner for <code>scannerId</code> or null if none found. */ 1284 RegionScanner getScanner(long scannerId) { 1285 RegionScannerHolder rsh = checkQuotaAndGetRegionScannerContext(scannerId); 1286 return rsh == null ? null : rsh.s; 1287 } 1288 1289 /** Returns The associated RegionScannerHolder for <code>scannerId</code> or null. */ 1290 private RegionScannerHolder checkQuotaAndGetRegionScannerContext(long scannerId) { 1291 return scanners.get(toScannerName(scannerId)); 1292 } 1293 1294 public String getScanDetailsWithId(long scannerId) { 1295 RegionScanner scanner = getScanner(scannerId); 1296 if (scanner == null) { 1297 return null; 1298 } 1299 StringBuilder builder = new StringBuilder(); 1300 builder.append("table: ").append(scanner.getRegionInfo().getTable().getNameAsString()); 1301 builder.append(" region: ").append(scanner.getRegionInfo().getRegionNameAsString()); 1302 builder.append(" operation_id: ").append(scanner.getOperationId()); 1303 return builder.toString(); 1304 } 1305 1306 public String getScanDetailsWithRequest(ScanRequest request) { 1307 try { 1308 if (!request.hasRegion()) { 1309 return null; 1310 } 1311 Region region = getRegion(request.getRegion()); 1312 StringBuilder builder = new StringBuilder(); 1313 builder.append("table: ").append(region.getRegionInfo().getTable().getNameAsString()); 1314 builder.append(" region: ").append(region.getRegionInfo().getRegionNameAsString()); 1315 for (NameBytesPair pair : request.getScan().getAttributeList()) { 1316 if (OperationWithAttributes.ID_ATRIBUTE.equals(pair.getName())) { 1317 builder.append(" operation_id: ").append(Bytes.toString(pair.getValue().toByteArray())); 1318 break; 1319 } 1320 } 1321 return builder.toString(); 1322 } catch (IOException ignored) { 1323 return null; 1324 } 1325 } 1326 1327 /** 1328 * Get the vtime associated with the scanner. Currently the vtime is the number of "next" calls. 1329 */ 1330 long getScannerVirtualTime(long scannerId) { 1331 RegionScannerHolder rsh = checkQuotaAndGetRegionScannerContext(scannerId); 1332 return rsh == null ? 0L : rsh.getNextCallSeq(); 1333 } 1334 1335 /** 1336 * Method to account for the size of retained cells. 1337 * @param context rpc call context 1338 * @param r result to add size. 1339 * @return an object that represents the last referenced block from this response. 1340 */ 1341 void addSize(RpcCallContext context, Result r) { 1342 if (context != null && r != null && !r.isEmpty()) { 1343 for (Cell c : r.rawCells()) { 1344 context.incrementResponseCellSize(PrivateCellUtil.estimatedSerializedSizeOf(c)); 1345 } 1346 } 1347 } 1348 1349 /** Returns Remote client's ip and port else null if can't be determined. */ 1350 @RestrictedApi(explanation = "Should only be called in TestRSRpcServices and RSRpcServices", 1351 link = "", allowedOnPath = ".*(TestRSRpcServices|RSRpcServices).java") 1352 static String getRemoteClientIpAndPort() { 1353 RpcCall rpcCall = RpcServer.getCurrentCall().orElse(null); 1354 if (rpcCall == null) { 1355 return HConstants.EMPTY_STRING; 1356 } 1357 InetAddress address = rpcCall.getRemoteAddress(); 1358 if (address == null) { 1359 return HConstants.EMPTY_STRING; 1360 } 1361 // Be careful here with InetAddress. Do InetAddress#getHostAddress. It will not do a name 1362 // resolution. Just use the IP. It is generally a smaller amount of info to keep around while 1363 // scanning than a hostname anyways. 1364 return Address.fromParts(address.getHostAddress(), rpcCall.getRemotePort()).toString(); 1365 } 1366 1367 /** Returns Remote client's username. */ 1368 @RestrictedApi(explanation = "Should only be called in TestRSRpcServices and RSRpcServices", 1369 link = "", allowedOnPath = ".*(TestRSRpcServices|RSRpcServices).java") 1370 static String getUserName() { 1371 RpcCall rpcCall = RpcServer.getCurrentCall().orElse(null); 1372 if (rpcCall == null) { 1373 return HConstants.EMPTY_STRING; 1374 } 1375 return rpcCall.getRequestUserName().orElse(HConstants.EMPTY_STRING); 1376 } 1377 1378 private RegionScannerHolder addScanner(String scannerName, RegionScanner s, Shipper shipper, 1379 HRegion r, boolean needCursor, boolean fullRegionScan) throws LeaseStillHeldException { 1380 Lease lease = server.getLeaseManager().createLease(scannerName, this.scannerLeaseTimeoutPeriod, 1381 new ScannerListener(scannerName)); 1382 RpcCallback shippedCallback = new RegionScannerShippedCallBack(scannerName, shipper, lease); 1383 RpcCallback closeCallback = 1384 s instanceof RpcCallback ? (RpcCallback) s : new RegionScannerCloseCallBack(s); 1385 RegionScannerHolder rsh = new RegionScannerHolder(s, r, closeCallback, shippedCallback, 1386 needCursor, fullRegionScan, getRemoteClientIpAndPort(), getUserName()); 1387 RegionScannerHolder existing = scanners.putIfAbsent(scannerName, rsh); 1388 assert existing == null : "scannerId must be unique within regionserver's whole lifecycle! " 1389 + scannerName + ", " + existing; 1390 return rsh; 1391 } 1392 1393 private boolean isFullRegionScan(Scan scan, HRegion region) { 1394 // If the scan start row equals or less than the start key of the region 1395 // and stop row greater than equals end key (if stop row present) 1396 // or if the stop row is empty 1397 // account this as a full region scan 1398 if ( 1399 Bytes.compareTo(scan.getStartRow(), region.getRegionInfo().getStartKey()) <= 0 1400 && (Bytes.compareTo(scan.getStopRow(), region.getRegionInfo().getEndKey()) >= 0 1401 && !Bytes.equals(region.getRegionInfo().getEndKey(), HConstants.EMPTY_END_ROW) 1402 || Bytes.equals(scan.getStopRow(), HConstants.EMPTY_END_ROW)) 1403 ) { 1404 return true; 1405 } 1406 return false; 1407 } 1408 1409 /** 1410 * Find the HRegion based on a region specifier 1411 * @param regionSpecifier the region specifier 1412 * @return the corresponding region 1413 * @throws IOException if the specifier is not null, but failed to find the region 1414 */ 1415 public HRegion getRegion(final RegionSpecifier regionSpecifier) throws IOException { 1416 return server.getRegion(regionSpecifier.getValue().toByteArray()); 1417 } 1418 1419 /** 1420 * Find the List of HRegions based on a list of region specifiers 1421 * @param regionSpecifiers the list of region specifiers 1422 * @return the corresponding list of regions 1423 * @throws IOException if any of the specifiers is not null, but failed to find the region 1424 */ 1425 private List<HRegion> getRegions(final List<RegionSpecifier> regionSpecifiers, 1426 final CacheEvictionStatsBuilder stats) { 1427 List<HRegion> regions = Lists.newArrayListWithCapacity(regionSpecifiers.size()); 1428 for (RegionSpecifier regionSpecifier : regionSpecifiers) { 1429 try { 1430 regions.add(server.getRegion(regionSpecifier.getValue().toByteArray())); 1431 } catch (NotServingRegionException e) { 1432 stats.addException(regionSpecifier.getValue().toByteArray(), e); 1433 } 1434 } 1435 return regions; 1436 } 1437 1438 PriorityFunction getPriority() { 1439 return priority; 1440 } 1441 1442 private RegionServerRpcQuotaManager getRpcQuotaManager() { 1443 return server.getRegionServerRpcQuotaManager(); 1444 } 1445 1446 private RegionServerSpaceQuotaManager getSpaceQuotaManager() { 1447 return server.getRegionServerSpaceQuotaManager(); 1448 } 1449 1450 void start(ZKWatcher zkWatcher) { 1451 this.scannerIdGenerator = new ScannerIdGenerator(this.server.getServerName()); 1452 internalStart(zkWatcher); 1453 } 1454 1455 void stop() { 1456 closeAllScanners(); 1457 internalStop(); 1458 } 1459 1460 /** 1461 * Called to verify that this server is up and running. 1462 */ 1463 // TODO : Rename this and HMaster#checkInitialized to isRunning() (or a better name). 1464 protected void checkOpen() throws IOException { 1465 if (server.isAborted()) { 1466 throw new RegionServerAbortedException("Server " + server.getServerName() + " aborting"); 1467 } 1468 if (server.isStopped()) { 1469 throw new RegionServerStoppedException("Server " + server.getServerName() + " stopping"); 1470 } 1471 if (!server.isDataFileSystemOk()) { 1472 throw new RegionServerStoppedException("File system not available"); 1473 } 1474 if (!server.isOnline()) { 1475 throw new ServerNotRunningYetException( 1476 "Server " + server.getServerName() + " is not running yet"); 1477 } 1478 } 1479 1480 /** 1481 * By default, put up an Admin and a Client Service. Set booleans 1482 * <code>hbase.regionserver.admin.executorService</code> and 1483 * <code>hbase.regionserver.client.executorService</code> if you want to enable/disable services. 1484 * Default is that both are enabled. 1485 * @return immutable list of blocking services and the security info classes that this server 1486 * supports 1487 */ 1488 protected List<BlockingServiceAndInterface> getServices() { 1489 boolean admin = getConfiguration().getBoolean(REGIONSERVER_ADMIN_SERVICE_CONFIG, true); 1490 boolean client = getConfiguration().getBoolean(REGIONSERVER_CLIENT_SERVICE_CONFIG, true); 1491 boolean clientMeta = 1492 getConfiguration().getBoolean(REGIONSERVER_CLIENT_META_SERVICE_CONFIG, true); 1493 boolean bootstrapNodes = 1494 getConfiguration().getBoolean(REGIONSERVER_BOOTSTRAP_NODES_SERVICE_CONFIG, true); 1495 List<BlockingServiceAndInterface> bssi = new ArrayList<>(); 1496 if (client) { 1497 bssi.add(new BlockingServiceAndInterface(ClientService.newReflectiveBlockingService(this), 1498 ClientService.BlockingInterface.class)); 1499 } 1500 if (admin) { 1501 bssi.add(new BlockingServiceAndInterface(AdminService.newReflectiveBlockingService(this), 1502 AdminService.BlockingInterface.class)); 1503 } 1504 if (clientMeta) { 1505 bssi.add(new BlockingServiceAndInterface(ClientMetaService.newReflectiveBlockingService(this), 1506 ClientMetaService.BlockingInterface.class)); 1507 } 1508 if (bootstrapNodes) { 1509 bssi.add( 1510 new BlockingServiceAndInterface(BootstrapNodeService.newReflectiveBlockingService(this), 1511 BootstrapNodeService.BlockingInterface.class)); 1512 } 1513 return new ImmutableList.Builder<BlockingServiceAndInterface>().addAll(bssi).build(); 1514 } 1515 1516 /** 1517 * Close a region on the region server. 1518 * @param controller the RPC controller 1519 * @param request the request 1520 */ 1521 @Override 1522 @QosPriority(priority = HConstants.ADMIN_QOS) 1523 public CloseRegionResponse closeRegion(final RpcController controller, 1524 final CloseRegionRequest request) throws ServiceException { 1525 final ServerName sn = (request.hasDestinationServer() 1526 ? ProtobufUtil.toServerName(request.getDestinationServer()) 1527 : null); 1528 1529 try { 1530 checkOpen(); 1531 throwOnWrongStartCode(request); 1532 final String encodedRegionName = ProtobufUtil.getRegionEncodedName(request.getRegion()); 1533 1534 requestCount.increment(); 1535 if (sn == null) { 1536 LOG.info("Close " + encodedRegionName + " without moving"); 1537 } else { 1538 LOG.info("Close " + encodedRegionName + ", moving to " + sn); 1539 } 1540 boolean closed = server.closeRegion(encodedRegionName, false, sn); 1541 CloseRegionResponse.Builder builder = CloseRegionResponse.newBuilder().setClosed(closed); 1542 return builder.build(); 1543 } catch (IOException ie) { 1544 throw new ServiceException(ie); 1545 } 1546 } 1547 1548 /** 1549 * Compact a region on the region server. 1550 * @param controller the RPC controller 1551 * @param request the request 1552 */ 1553 @Override 1554 @QosPriority(priority = HConstants.ADMIN_QOS) 1555 public CompactRegionResponse compactRegion(final RpcController controller, 1556 final CompactRegionRequest request) throws ServiceException { 1557 try { 1558 checkOpen(); 1559 requestCount.increment(); 1560 HRegion region = getRegion(request.getRegion()); 1561 // Quota support is enabled, the requesting user is not system/super user 1562 // and a quota policy is enforced that disables compactions. 1563 if ( 1564 QuotaUtil.isQuotaEnabled(getConfiguration()) 1565 && !Superusers.isSuperUser(RpcServer.getRequestUser().orElse(null)) 1566 && this.server.getRegionServerSpaceQuotaManager() 1567 .areCompactionsDisabled(region.getTableDescriptor().getTableName()) 1568 ) { 1569 throw new DoNotRetryIOException( 1570 "Compactions on this region are " + "disabled due to a space quota violation."); 1571 } 1572 region.startRegionOperation(Operation.COMPACT_REGION); 1573 LOG.info("Compacting " + region.getRegionInfo().getRegionNameAsString()); 1574 boolean major = request.hasMajor() && request.getMajor(); 1575 if (request.hasFamily()) { 1576 byte[] family = request.getFamily().toByteArray(); 1577 String log = "User-triggered " + (major ? "major " : "") + "compaction for region " 1578 + region.getRegionInfo().getRegionNameAsString() + " and family " 1579 + Bytes.toString(family); 1580 LOG.trace(log); 1581 region.requestCompaction(family, log, Store.PRIORITY_USER, major, 1582 CompactionLifeCycleTracker.DUMMY); 1583 } else { 1584 String log = "User-triggered " + (major ? "major " : "") + "compaction for region " 1585 + region.getRegionInfo().getRegionNameAsString(); 1586 LOG.trace(log); 1587 region.requestCompaction(log, Store.PRIORITY_USER, major, CompactionLifeCycleTracker.DUMMY); 1588 } 1589 return CompactRegionResponse.newBuilder().build(); 1590 } catch (IOException ie) { 1591 throw new ServiceException(ie); 1592 } 1593 } 1594 1595 @Override 1596 @QosPriority(priority = HConstants.ADMIN_QOS) 1597 public CompactionSwitchResponse compactionSwitch(RpcController controller, 1598 CompactionSwitchRequest request) throws ServiceException { 1599 rpcPreCheck("compactionSwitch"); 1600 final CompactSplit compactSplitThread = server.getCompactSplitThread(); 1601 requestCount.increment(); 1602 boolean prevState = compactSplitThread.isCompactionsEnabled(); 1603 CompactionSwitchResponse response = 1604 CompactionSwitchResponse.newBuilder().setPrevState(prevState).build(); 1605 if (prevState == request.getEnabled()) { 1606 // passed in requested state is same as current state. No action required 1607 return response; 1608 } 1609 compactSplitThread.switchCompaction(request.getEnabled()); 1610 return response; 1611 } 1612 1613 /** 1614 * Flush a region on the region server. 1615 * @param controller the RPC controller 1616 * @param request the request 1617 */ 1618 @Override 1619 @QosPriority(priority = HConstants.ADMIN_QOS) 1620 public FlushRegionResponse flushRegion(final RpcController controller, 1621 final FlushRegionRequest request) throws ServiceException { 1622 try { 1623 checkOpen(); 1624 requestCount.increment(); 1625 HRegion region = getRegion(request.getRegion()); 1626 LOG.info("Flushing " + region.getRegionInfo().getRegionNameAsString()); 1627 boolean shouldFlush = true; 1628 if (request.hasIfOlderThanTs()) { 1629 shouldFlush = region.getEarliestFlushTimeForAllStores() < request.getIfOlderThanTs(); 1630 } 1631 FlushRegionResponse.Builder builder = FlushRegionResponse.newBuilder(); 1632 if (shouldFlush) { 1633 boolean writeFlushWalMarker = 1634 request.hasWriteFlushWalMarker() ? request.getWriteFlushWalMarker() : false; 1635 // Go behind the curtain so we can manage writing of the flush WAL marker 1636 HRegion.FlushResultImpl flushResult = null; 1637 if (request.hasFamily()) { 1638 List<byte[]> families = new ArrayList(); 1639 families.add(request.getFamily().toByteArray()); 1640 TableDescriptor tableDescriptor = region.getTableDescriptor(); 1641 List<String> noSuchFamilies = families.stream() 1642 .filter(f -> !tableDescriptor.hasColumnFamily(f)).map(Bytes::toString).toList(); 1643 if (!noSuchFamilies.isEmpty()) { 1644 throw new NoSuchColumnFamilyException("Column families " + noSuchFamilies 1645 + " don't exist in table " + tableDescriptor.getTableName().getNameAsString()); 1646 } 1647 flushResult = 1648 region.flushcache(families, writeFlushWalMarker, FlushLifeCycleTracker.DUMMY); 1649 } else { 1650 flushResult = region.flushcache(true, writeFlushWalMarker, FlushLifeCycleTracker.DUMMY); 1651 } 1652 boolean compactionNeeded = flushResult.isCompactionNeeded(); 1653 if (compactionNeeded) { 1654 server.getCompactSplitThread().requestSystemCompaction(region, 1655 "Compaction through user triggered flush"); 1656 } 1657 builder.setFlushed(flushResult.isFlushSucceeded()); 1658 builder.setWroteFlushWalMarker(flushResult.wroteFlushWalMarker); 1659 } 1660 builder.setLastFlushTime(region.getEarliestFlushTimeForAllStores()); 1661 return builder.build(); 1662 } catch (DroppedSnapshotException ex) { 1663 // Cache flush can fail in a few places. If it fails in a critical 1664 // section, we get a DroppedSnapshotException and a replay of wal 1665 // is required. Currently the only way to do this is a restart of 1666 // the server. 1667 server.abort("Replay of WAL required. Forcing server shutdown", ex); 1668 throw new ServiceException(ex); 1669 } catch (IOException ie) { 1670 throw new ServiceException(ie); 1671 } 1672 } 1673 1674 @Override 1675 @QosPriority(priority = HConstants.ADMIN_QOS) 1676 public GetOnlineRegionResponse getOnlineRegion(final RpcController controller, 1677 final GetOnlineRegionRequest request) throws ServiceException { 1678 try { 1679 checkOpen(); 1680 requestCount.increment(); 1681 Map<String, HRegion> onlineRegions = server.getOnlineRegions(); 1682 List<RegionInfo> list = new ArrayList<>(onlineRegions.size()); 1683 for (HRegion region : onlineRegions.values()) { 1684 list.add(region.getRegionInfo()); 1685 } 1686 list.sort(RegionInfo.COMPARATOR); 1687 return ResponseConverter.buildGetOnlineRegionResponse(list); 1688 } catch (IOException ie) { 1689 throw new ServiceException(ie); 1690 } 1691 } 1692 1693 // Master implementation of this Admin Service differs given it is not 1694 // able to supply detail only known to RegionServer. See note on 1695 // MasterRpcServers#getRegionInfo. 1696 @Override 1697 @QosPriority(priority = HConstants.ADMIN_QOS) 1698 public GetRegionInfoResponse getRegionInfo(final RpcController controller, 1699 final GetRegionInfoRequest request) throws ServiceException { 1700 try { 1701 checkOpen(); 1702 requestCount.increment(); 1703 HRegion region = getRegion(request.getRegion()); 1704 RegionInfo info = region.getRegionInfo(); 1705 byte[] bestSplitRow; 1706 if (request.hasBestSplitRow() && request.getBestSplitRow()) { 1707 bestSplitRow = region.checkSplit(true).orElse(null); 1708 // when all table data are in memstore, bestSplitRow = null 1709 // try to flush region first 1710 if (bestSplitRow == null) { 1711 region.flush(true); 1712 bestSplitRow = region.checkSplit(true).orElse(null); 1713 } 1714 } else { 1715 bestSplitRow = null; 1716 } 1717 GetRegionInfoResponse.Builder builder = GetRegionInfoResponse.newBuilder(); 1718 builder.setRegionInfo(ProtobufUtil.toRegionInfo(info)); 1719 if (request.hasCompactionState() && request.getCompactionState()) { 1720 builder.setCompactionState(ProtobufUtil.createCompactionState(region.getCompactionState())); 1721 } 1722 builder.setSplittable(region.isSplittable()); 1723 builder.setMergeable(region.isMergeable()); 1724 if (request.hasBestSplitRow() && request.getBestSplitRow() && bestSplitRow != null) { 1725 builder.setBestSplitRow(UnsafeByteOperations.unsafeWrap(bestSplitRow)); 1726 } 1727 return builder.build(); 1728 } catch (IOException ie) { 1729 throw new ServiceException(ie); 1730 } 1731 } 1732 1733 @Override 1734 @QosPriority(priority = HConstants.ADMIN_QOS) 1735 public GetRegionLoadResponse getRegionLoad(RpcController controller, GetRegionLoadRequest request) 1736 throws ServiceException { 1737 1738 List<HRegion> regions; 1739 if (request.hasTableName()) { 1740 TableName tableName = ProtobufUtil.toTableName(request.getTableName()); 1741 regions = server.getRegions(tableName); 1742 } else { 1743 regions = server.getRegions(); 1744 } 1745 List<RegionLoad> rLoads = new ArrayList<>(regions.size()); 1746 RegionLoad.Builder regionLoadBuilder = ClusterStatusProtos.RegionLoad.newBuilder(); 1747 RegionSpecifier.Builder regionSpecifier = RegionSpecifier.newBuilder(); 1748 1749 try { 1750 for (HRegion region : regions) { 1751 rLoads.add(server.createRegionLoad(region, regionLoadBuilder, regionSpecifier)); 1752 } 1753 } catch (IOException e) { 1754 throw new ServiceException(e); 1755 } 1756 GetRegionLoadResponse.Builder builder = GetRegionLoadResponse.newBuilder(); 1757 builder.addAllRegionLoads(rLoads); 1758 return builder.build(); 1759 } 1760 1761 @Override 1762 @QosPriority(priority = HConstants.ADMIN_QOS) 1763 public ClearCompactionQueuesResponse clearCompactionQueues(RpcController controller, 1764 ClearCompactionQueuesRequest request) throws ServiceException { 1765 LOG.debug("Client=" + RpcServer.getRequestUserName().orElse(null) + "/" 1766 + RpcServer.getRemoteAddress().orElse(null) + " clear compactions queue"); 1767 ClearCompactionQueuesResponse.Builder respBuilder = ClearCompactionQueuesResponse.newBuilder(); 1768 requestCount.increment(); 1769 if (clearCompactionQueues.compareAndSet(false, true)) { 1770 final CompactSplit compactSplitThread = server.getCompactSplitThread(); 1771 try { 1772 checkOpen(); 1773 server.getRegionServerCoprocessorHost().preClearCompactionQueues(); 1774 for (String queueName : request.getQueueNameList()) { 1775 LOG.debug("clear " + queueName + " compaction queue"); 1776 switch (queueName) { 1777 case "long": 1778 compactSplitThread.clearLongCompactionsQueue(); 1779 break; 1780 case "short": 1781 compactSplitThread.clearShortCompactionsQueue(); 1782 break; 1783 default: 1784 LOG.warn("Unknown queue name " + queueName); 1785 throw new IOException("Unknown queue name " + queueName); 1786 } 1787 } 1788 server.getRegionServerCoprocessorHost().postClearCompactionQueues(); 1789 } catch (IOException ie) { 1790 throw new ServiceException(ie); 1791 } finally { 1792 clearCompactionQueues.set(false); 1793 } 1794 } else { 1795 LOG.warn("Clear compactions queue is executing by other admin."); 1796 } 1797 return respBuilder.build(); 1798 } 1799 1800 /** 1801 * Get some information of the region server. 1802 * @param controller the RPC controller 1803 * @param request the request 1804 */ 1805 @Override 1806 @QosPriority(priority = HConstants.ADMIN_QOS) 1807 public GetServerInfoResponse getServerInfo(final RpcController controller, 1808 final GetServerInfoRequest request) throws ServiceException { 1809 try { 1810 checkOpen(); 1811 } catch (IOException ie) { 1812 throw new ServiceException(ie); 1813 } 1814 requestCount.increment(); 1815 int infoPort = server.getInfoServer() != null ? server.getInfoServer().getPort() : -1; 1816 return ResponseConverter.buildGetServerInfoResponse(server.getServerName(), infoPort); 1817 } 1818 1819 @Override 1820 @QosPriority(priority = HConstants.ADMIN_QOS) 1821 public GetStoreFileResponse getStoreFile(final RpcController controller, 1822 final GetStoreFileRequest request) throws ServiceException { 1823 try { 1824 checkOpen(); 1825 HRegion region = getRegion(request.getRegion()); 1826 requestCount.increment(); 1827 Set<byte[]> columnFamilies; 1828 if (request.getFamilyCount() == 0) { 1829 columnFamilies = region.getTableDescriptor().getColumnFamilyNames(); 1830 } else { 1831 columnFamilies = new TreeSet<>(Bytes.BYTES_RAWCOMPARATOR); 1832 for (ByteString cf : request.getFamilyList()) { 1833 columnFamilies.add(cf.toByteArray()); 1834 } 1835 } 1836 int nCF = columnFamilies.size(); 1837 List<String> fileList = region.getStoreFileList(columnFamilies.toArray(new byte[nCF][])); 1838 GetStoreFileResponse.Builder builder = GetStoreFileResponse.newBuilder(); 1839 builder.addAllStoreFile(fileList); 1840 return builder.build(); 1841 } catch (IOException ie) { 1842 throw new ServiceException(ie); 1843 } 1844 } 1845 1846 private void throwOnWrongStartCode(OpenRegionRequest request) throws ServiceException { 1847 if (!request.hasServerStartCode()) { 1848 LOG.warn("OpenRegionRequest for {} does not have a start code", request.getOpenInfoList()); 1849 return; 1850 } 1851 throwOnWrongStartCode(request.getServerStartCode()); 1852 } 1853 1854 private void throwOnWrongStartCode(CloseRegionRequest request) throws ServiceException { 1855 if (!request.hasServerStartCode()) { 1856 LOG.warn("CloseRegionRequest for {} does not have a start code", request.getRegion()); 1857 return; 1858 } 1859 throwOnWrongStartCode(request.getServerStartCode()); 1860 } 1861 1862 private void throwOnWrongStartCode(long serverStartCode) throws ServiceException { 1863 // check that we are the same server that this RPC is intended for. 1864 if (server.getServerName().getStartcode() != serverStartCode) { 1865 throw new ServiceException(new DoNotRetryIOException( 1866 "This RPC was intended for a " + "different server with startCode: " + serverStartCode 1867 + ", this server is: " + server.getServerName())); 1868 } 1869 } 1870 1871 private void throwOnWrongStartCode(ExecuteProceduresRequest req) throws ServiceException { 1872 if (req.getOpenRegionCount() > 0) { 1873 for (OpenRegionRequest openReq : req.getOpenRegionList()) { 1874 throwOnWrongStartCode(openReq); 1875 } 1876 } 1877 if (req.getCloseRegionCount() > 0) { 1878 for (CloseRegionRequest closeReq : req.getCloseRegionList()) { 1879 throwOnWrongStartCode(closeReq); 1880 } 1881 } 1882 } 1883 1884 /** 1885 * Open asynchronously a region or a set of regions on the region server. The opening is 1886 * coordinated by ZooKeeper, and this method requires the znode to be created before being called. 1887 * As a consequence, this method should be called only from the master. 1888 * <p> 1889 * Different manages states for the region are: 1890 * </p> 1891 * <ul> 1892 * <li>region not opened: the region opening will start asynchronously.</li> 1893 * <li>a close is already in progress: this is considered as an error.</li> 1894 * <li>an open is already in progress: this new open request will be ignored. This is important 1895 * because the Master can do multiple requests if it crashes.</li> 1896 * <li>the region is already opened: this new open request will be ignored.</li> 1897 * </ul> 1898 * <p> 1899 * Bulk assign: If there are more than 1 region to open, it will be considered as a bulk assign. 1900 * For a single region opening, errors are sent through a ServiceException. For bulk assign, 1901 * errors are put in the response as FAILED_OPENING. 1902 * </p> 1903 * @param controller the RPC controller 1904 * @param request the request 1905 */ 1906 @Override 1907 @QosPriority(priority = HConstants.ADMIN_QOS) 1908 public OpenRegionResponse openRegion(final RpcController controller, 1909 final OpenRegionRequest request) throws ServiceException { 1910 requestCount.increment(); 1911 throwOnWrongStartCode(request); 1912 1913 OpenRegionResponse.Builder builder = OpenRegionResponse.newBuilder(); 1914 final int regionCount = request.getOpenInfoCount(); 1915 final Map<TableName, TableDescriptor> htds = new HashMap<>(regionCount); 1916 final boolean isBulkAssign = regionCount > 1; 1917 try { 1918 checkOpen(); 1919 } catch (IOException ie) { 1920 TableName tableName = null; 1921 if (regionCount == 1) { 1922 org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionInfo ri = 1923 request.getOpenInfo(0).getRegion(); 1924 if (ri != null) { 1925 tableName = ProtobufUtil.toTableName(ri.getTableName()); 1926 } 1927 } 1928 if (!TableName.META_TABLE_NAME.equals(tableName)) { 1929 throw new ServiceException(ie); 1930 } 1931 // We are assigning meta, wait a little for regionserver to finish initialization. 1932 // Default to quarter of RPC timeout 1933 int timeout = server.getConfiguration().getInt(HConstants.HBASE_RPC_TIMEOUT_KEY, 1934 HConstants.DEFAULT_HBASE_RPC_TIMEOUT) >> 2; 1935 long endTime = EnvironmentEdgeManager.currentTime() + timeout; 1936 synchronized (server.online) { 1937 try { 1938 while ( 1939 EnvironmentEdgeManager.currentTime() <= endTime && !server.isStopped() 1940 && !server.isOnline() 1941 ) { 1942 server.online.wait(server.getMsgInterval()); 1943 } 1944 checkOpen(); 1945 } catch (InterruptedException t) { 1946 Thread.currentThread().interrupt(); 1947 throw new ServiceException(t); 1948 } catch (IOException e) { 1949 throw new ServiceException(e); 1950 } 1951 } 1952 } 1953 1954 long masterSystemTime = request.hasMasterSystemTime() ? request.getMasterSystemTime() : -1; 1955 1956 for (RegionOpenInfo regionOpenInfo : request.getOpenInfoList()) { 1957 final RegionInfo region = ProtobufUtil.toRegionInfo(regionOpenInfo.getRegion()); 1958 TableDescriptor htd; 1959 try { 1960 String encodedName = region.getEncodedName(); 1961 byte[] encodedNameBytes = region.getEncodedNameAsBytes(); 1962 final HRegion onlineRegion = server.getRegion(encodedName); 1963 if (onlineRegion != null) { 1964 // The region is already online. This should not happen any more. 1965 String error = "Received OPEN for the region:" + region.getRegionNameAsString() 1966 + ", which is already online"; 1967 LOG.warn(error); 1968 // server.abort(error); 1969 // throw new IOException(error); 1970 builder.addOpeningState(RegionOpeningState.OPENED); 1971 continue; 1972 } 1973 LOG.info("Open " + region.getRegionNameAsString()); 1974 1975 final Boolean previous = 1976 server.getRegionsInTransitionInRS().putIfAbsent(encodedNameBytes, Boolean.TRUE); 1977 1978 if (Boolean.FALSE.equals(previous)) { 1979 if (server.getRegion(encodedName) != null) { 1980 // There is a close in progress. This should not happen any more. 1981 String error = "Received OPEN for the region:" + region.getRegionNameAsString() 1982 + ", which we are already trying to CLOSE"; 1983 server.abort(error); 1984 throw new IOException(error); 1985 } 1986 server.getRegionsInTransitionInRS().put(encodedNameBytes, Boolean.TRUE); 1987 } 1988 1989 if (Boolean.TRUE.equals(previous)) { 1990 // An open is in progress. This is supported, but let's log this. 1991 LOG.info("Receiving OPEN for the region:" + region.getRegionNameAsString() 1992 + ", which we are already trying to OPEN" 1993 + " - ignoring this new request for this region."); 1994 } 1995 1996 // We are opening this region. If it moves back and forth for whatever reason, we don't 1997 // want to keep returning the stale moved record while we are opening/if we close again. 1998 server.removeFromMovedRegions(region.getEncodedName()); 1999 2000 if (previous == null || !previous.booleanValue()) { 2001 htd = htds.get(region.getTable()); 2002 if (htd == null) { 2003 htd = server.getTableDescriptors().get(region.getTable()); 2004 htds.put(region.getTable(), htd); 2005 } 2006 if (htd == null) { 2007 throw new IOException("Missing table descriptor for " + region.getEncodedName()); 2008 } 2009 // If there is no action in progress, we can submit a specific handler. 2010 // Need to pass the expected version in the constructor. 2011 if (server.getExecutorService() == null) { 2012 LOG.info("No executor executorService; skipping open request"); 2013 } else { 2014 if (region.isMetaRegion()) { 2015 server.getExecutorService() 2016 .submit(new OpenMetaHandler(server, server, region, htd, masterSystemTime)); 2017 } else { 2018 if (regionOpenInfo.getFavoredNodesCount() > 0) { 2019 server.updateRegionFavoredNodesMapping(region.getEncodedName(), 2020 regionOpenInfo.getFavoredNodesList()); 2021 } 2022 if (htd.getPriority() >= HConstants.ADMIN_QOS || region.getTable().isSystemTable()) { 2023 server.getExecutorService().submit( 2024 new OpenPriorityRegionHandler(server, server, region, htd, masterSystemTime)); 2025 } else { 2026 server.getExecutorService() 2027 .submit(new OpenRegionHandler(server, server, region, htd, masterSystemTime)); 2028 } 2029 } 2030 } 2031 } 2032 2033 builder.addOpeningState(RegionOpeningState.OPENED); 2034 } catch (IOException ie) { 2035 LOG.warn("Failed opening region " + region.getRegionNameAsString(), ie); 2036 if (isBulkAssign) { 2037 builder.addOpeningState(RegionOpeningState.FAILED_OPENING); 2038 } else { 2039 throw new ServiceException(ie); 2040 } 2041 } 2042 } 2043 return builder.build(); 2044 } 2045 2046 /** 2047 * Warmup a region on this server. This method should only be called by Master. It synchronously 2048 * opens the region and closes the region bringing the most important pages in cache. 2049 */ 2050 @Override 2051 public WarmupRegionResponse warmupRegion(final RpcController controller, 2052 final WarmupRegionRequest request) throws ServiceException { 2053 final RegionInfo region = ProtobufUtil.toRegionInfo(request.getRegionInfo()); 2054 WarmupRegionResponse response = WarmupRegionResponse.getDefaultInstance(); 2055 try { 2056 checkOpen(); 2057 String encodedName = region.getEncodedName(); 2058 byte[] encodedNameBytes = region.getEncodedNameAsBytes(); 2059 final HRegion onlineRegion = server.getRegion(encodedName); 2060 if (onlineRegion != null) { 2061 LOG.info("{} is online; skipping warmup", region); 2062 return response; 2063 } 2064 TableDescriptor htd = server.getTableDescriptors().get(region.getTable()); 2065 if (server.getRegionsInTransitionInRS().containsKey(encodedNameBytes)) { 2066 LOG.info("{} is in transition; skipping warmup", region); 2067 return response; 2068 } 2069 LOG.info("Warmup {}", region.getRegionNameAsString()); 2070 HRegion.warmupHRegion(region, htd, server.getWAL(region), server.getConfiguration(), server, 2071 null); 2072 } catch (IOException ie) { 2073 LOG.error("Failed warmup of {}", region.getRegionNameAsString(), ie); 2074 throw new ServiceException(ie); 2075 } 2076 2077 return response; 2078 } 2079 2080 private ExtendedCellScanner getAndReset(RpcController controller) { 2081 HBaseRpcController hrc = (HBaseRpcController) controller; 2082 ExtendedCellScanner cells = hrc.cellScanner(); 2083 hrc.setCellScanner(null); 2084 return cells; 2085 } 2086 2087 /** 2088 * Replay the given changes when distributedLogReplay WAL edits from a failed RS. The guarantee is 2089 * that the given mutations will be durable on the receiving RS if this method returns without any 2090 * exception. 2091 * @param controller the RPC controller 2092 * @param request the request 2093 * @deprecated Since 3.0.0, will be removed in 4.0.0. Not used any more, put here only for 2094 * compatibility with old region replica implementation. Now we will use 2095 * {@code replicateToReplica} method instead. 2096 */ 2097 @Deprecated 2098 @Override 2099 @QosPriority(priority = HConstants.REPLAY_QOS) 2100 public ReplicateWALEntryResponse replay(final RpcController controller, 2101 final ReplicateWALEntryRequest request) throws ServiceException { 2102 long before = EnvironmentEdgeManager.currentTime(); 2103 ExtendedCellScanner cells = getAndReset(controller); 2104 try { 2105 checkOpen(); 2106 List<WALEntry> entries = request.getEntryList(); 2107 if (entries == null || entries.isEmpty()) { 2108 // empty input 2109 return ReplicateWALEntryResponse.newBuilder().build(); 2110 } 2111 ByteString regionName = entries.get(0).getKey().getEncodedRegionName(); 2112 HRegion region = server.getRegionByEncodedName(regionName.toStringUtf8()); 2113 RegionCoprocessorHost coprocessorHost = 2114 ServerRegionReplicaUtil.isDefaultReplica(region.getRegionInfo()) 2115 ? region.getCoprocessorHost() 2116 : null; // do not invoke coprocessors if this is a secondary region replica 2117 List<Pair<WALKey, WALEdit>> walEntries = new ArrayList<>(); 2118 2119 // Skip adding the edits to WAL if this is a secondary region replica 2120 boolean isPrimary = RegionReplicaUtil.isDefaultReplica(region.getRegionInfo()); 2121 Durability durability = isPrimary ? Durability.USE_DEFAULT : Durability.SKIP_WAL; 2122 2123 for (WALEntry entry : entries) { 2124 if (!regionName.equals(entry.getKey().getEncodedRegionName())) { 2125 throw new NotServingRegionException("Replay request contains entries from multiple " 2126 + "regions. First region:" + regionName.toStringUtf8() + " , other region:" 2127 + entry.getKey().getEncodedRegionName()); 2128 } 2129 if (server.nonceManager != null && isPrimary) { 2130 long nonceGroup = 2131 entry.getKey().hasNonceGroup() ? entry.getKey().getNonceGroup() : HConstants.NO_NONCE; 2132 long nonce = entry.getKey().hasNonce() ? entry.getKey().getNonce() : HConstants.NO_NONCE; 2133 server.nonceManager.reportOperationFromWal(nonceGroup, nonce, 2134 entry.getKey().getWriteTime()); 2135 } 2136 Pair<WALKey, WALEdit> walEntry = (coprocessorHost == null) ? null : new Pair<>(); 2137 List<MutationReplay> edits = 2138 WALSplitUtil.getMutationsFromWALEntry(entry, cells, walEntry, durability); 2139 if (coprocessorHost != null) { 2140 // Start coprocessor replay here. The coprocessor is for each WALEdit instead of a 2141 // KeyValue. 2142 if ( 2143 coprocessorHost.preWALRestore(region.getRegionInfo(), walEntry.getFirst(), 2144 walEntry.getSecond()) 2145 ) { 2146 // if bypass this log entry, ignore it ... 2147 continue; 2148 } 2149 walEntries.add(walEntry); 2150 } 2151 if (edits != null && !edits.isEmpty()) { 2152 // HBASE-17924 2153 // sort to improve lock efficiency 2154 Collections.sort(edits, (v1, v2) -> Row.COMPARATOR.compare(v1.mutation, v2.mutation)); 2155 long replaySeqId = (entry.getKey().hasOrigSequenceNumber()) 2156 ? entry.getKey().getOrigSequenceNumber() 2157 : entry.getKey().getLogSequenceNumber(); 2158 OperationStatus[] result = doReplayBatchOp(region, edits, replaySeqId); 2159 // check if it's a partial success 2160 for (int i = 0; result != null && i < result.length; i++) { 2161 if (result[i] != OperationStatus.SUCCESS) { 2162 throw new IOException(result[i].getExceptionMsg()); 2163 } 2164 } 2165 } 2166 } 2167 2168 // sync wal at the end because ASYNC_WAL is used above 2169 WAL wal = region.getWAL(); 2170 if (wal != null) { 2171 wal.sync(); 2172 } 2173 2174 if (coprocessorHost != null) { 2175 for (Pair<WALKey, WALEdit> entry : walEntries) { 2176 coprocessorHost.postWALRestore(region.getRegionInfo(), entry.getFirst(), 2177 entry.getSecond()); 2178 } 2179 } 2180 return ReplicateWALEntryResponse.newBuilder().build(); 2181 } catch (IOException ie) { 2182 throw new ServiceException(ie); 2183 } finally { 2184 final MetricsRegionServer metricsRegionServer = server.getMetrics(); 2185 if (metricsRegionServer != null) { 2186 metricsRegionServer.updateReplay(EnvironmentEdgeManager.currentTime() - before); 2187 } 2188 } 2189 } 2190 2191 /** 2192 * Replay the given changes on a secondary replica 2193 */ 2194 @Override 2195 public ReplicateWALEntryResponse replicateToReplica(RpcController controller, 2196 ReplicateWALEntryRequest request) throws ServiceException { 2197 CellScanner cells = getAndReset(controller); 2198 try { 2199 checkOpen(); 2200 List<WALEntry> entries = request.getEntryList(); 2201 if (entries == null || entries.isEmpty()) { 2202 // empty input 2203 return ReplicateWALEntryResponse.newBuilder().build(); 2204 } 2205 ByteString regionName = entries.get(0).getKey().getEncodedRegionName(); 2206 HRegion region = server.getRegionByEncodedName(regionName.toStringUtf8()); 2207 if (RegionReplicaUtil.isDefaultReplica(region.getRegionInfo())) { 2208 throw new DoNotRetryIOException( 2209 "Should not replicate to primary replica " + region.getRegionInfo() + ", CODE BUG?"); 2210 } 2211 for (WALEntry entry : entries) { 2212 if (!regionName.equals(entry.getKey().getEncodedRegionName())) { 2213 throw new NotServingRegionException( 2214 "ReplicateToReplica request contains entries from multiple " + "regions. First region:" 2215 + regionName.toStringUtf8() + " , other region:" 2216 + entry.getKey().getEncodedRegionName()); 2217 } 2218 region.replayWALEntry(entry, cells); 2219 } 2220 return ReplicateWALEntryResponse.newBuilder().build(); 2221 } catch (IOException ie) { 2222 throw new ServiceException(ie); 2223 } 2224 } 2225 2226 private void checkShouldRejectReplicationRequest(List<WALEntry> entries) throws IOException { 2227 ReplicationSourceService replicationSource = server.getReplicationSourceService(); 2228 if (replicationSource == null || entries.isEmpty()) { 2229 return; 2230 } 2231 // We can ensure that all entries are for one peer, so only need to check one entry's 2232 // table name. if the table hit sync replication at peer side and the peer cluster 2233 // is (or is transiting to) state ACTIVE or DOWNGRADE_ACTIVE, we should reject to apply 2234 // those entries according to the design doc. 2235 TableName table = TableName.valueOf(entries.get(0).getKey().getTableName().toByteArray()); 2236 if ( 2237 replicationSource.getSyncReplicationPeerInfoProvider().checkState(table, 2238 RejectReplicationRequestStateChecker.get()) 2239 ) { 2240 throw new DoNotRetryIOException( 2241 "Reject to apply to sink cluster because sync replication state of sink cluster " 2242 + "is ACTIVE or DOWNGRADE_ACTIVE, table: " + table); 2243 } 2244 } 2245 2246 /** 2247 * Replicate WAL entries on the region server. 2248 * @param controller the RPC controller 2249 * @param request the request 2250 */ 2251 @Override 2252 @QosPriority(priority = HConstants.REPLICATION_QOS) 2253 public ReplicateWALEntryResponse replicateWALEntry(final RpcController controller, 2254 final ReplicateWALEntryRequest request) throws ServiceException { 2255 try { 2256 checkOpen(); 2257 if (server.getReplicationSinkService() != null) { 2258 requestCount.increment(); 2259 List<WALEntry> entries = request.getEntryList(); 2260 checkShouldRejectReplicationRequest(entries); 2261 ExtendedCellScanner cellScanner = getAndReset(controller); 2262 server.getRegionServerCoprocessorHost().preReplicateLogEntries(); 2263 server.getReplicationSinkService().replicateLogEntries(entries, cellScanner, 2264 request.getReplicationClusterId(), request.getSourceBaseNamespaceDirPath(), 2265 request.getSourceHFileArchiveDirPath()); 2266 server.getRegionServerCoprocessorHost().postReplicateLogEntries(); 2267 return ReplicateWALEntryResponse.newBuilder().build(); 2268 } else { 2269 throw new ServiceException("Replication services are not initialized yet"); 2270 } 2271 } catch (IOException ie) { 2272 throw new ServiceException(ie); 2273 } 2274 } 2275 2276 /** 2277 * Roll the WAL writer of the region server. 2278 * @param controller the RPC controller 2279 * @param request the request 2280 */ 2281 @Override 2282 @QosPriority(priority = HConstants.ADMIN_QOS) 2283 public RollWALWriterResponse rollWALWriter(final RpcController controller, 2284 final RollWALWriterRequest request) throws ServiceException { 2285 try { 2286 checkOpen(); 2287 requestCount.increment(); 2288 server.getRegionServerCoprocessorHost().preRollWALWriterRequest(); 2289 server.getWalRoller().requestRollAll(); 2290 server.getRegionServerCoprocessorHost().postRollWALWriterRequest(); 2291 RollWALWriterResponse.Builder builder = RollWALWriterResponse.newBuilder(); 2292 return builder.build(); 2293 } catch (IOException ie) { 2294 throw new ServiceException(ie); 2295 } 2296 } 2297 2298 /** 2299 * Stop the region server. 2300 * @param controller the RPC controller 2301 * @param request the request 2302 */ 2303 @Override 2304 @QosPriority(priority = HConstants.ADMIN_QOS) 2305 public StopServerResponse stopServer(final RpcController controller, 2306 final StopServerRequest request) throws ServiceException { 2307 rpcPreCheck("stopServer"); 2308 requestCount.increment(); 2309 String reason = request.getReason(); 2310 server.stop(reason); 2311 return StopServerResponse.newBuilder().build(); 2312 } 2313 2314 @Override 2315 public UpdateFavoredNodesResponse updateFavoredNodes(RpcController controller, 2316 UpdateFavoredNodesRequest request) throws ServiceException { 2317 rpcPreCheck("updateFavoredNodes"); 2318 List<UpdateFavoredNodesRequest.RegionUpdateInfo> openInfoList = request.getUpdateInfoList(); 2319 UpdateFavoredNodesResponse.Builder respBuilder = UpdateFavoredNodesResponse.newBuilder(); 2320 for (UpdateFavoredNodesRequest.RegionUpdateInfo regionUpdateInfo : openInfoList) { 2321 RegionInfo hri = ProtobufUtil.toRegionInfo(regionUpdateInfo.getRegion()); 2322 if (regionUpdateInfo.getFavoredNodesCount() > 0) { 2323 server.updateRegionFavoredNodesMapping(hri.getEncodedName(), 2324 regionUpdateInfo.getFavoredNodesList()); 2325 } 2326 } 2327 respBuilder.setResponse(openInfoList.size()); 2328 return respBuilder.build(); 2329 } 2330 2331 /** 2332 * Atomically bulk load several HFiles into an open region 2333 * @return true if successful, false is failed but recoverably (no action) 2334 * @throws ServiceException if failed unrecoverably 2335 */ 2336 @Override 2337 public BulkLoadHFileResponse bulkLoadHFile(final RpcController controller, 2338 final BulkLoadHFileRequest request) throws ServiceException { 2339 long start = EnvironmentEdgeManager.currentTime(); 2340 List<String> clusterIds = new ArrayList<>(request.getClusterIdsList()); 2341 if (clusterIds.contains(this.server.getClusterId())) { 2342 return BulkLoadHFileResponse.newBuilder().setLoaded(true).build(); 2343 } else { 2344 clusterIds.add(this.server.getClusterId()); 2345 } 2346 try { 2347 checkOpen(); 2348 requestCount.increment(); 2349 HRegion region = getRegion(request.getRegion()); 2350 final boolean spaceQuotaEnabled = QuotaUtil.isQuotaEnabled(getConfiguration()); 2351 long sizeToBeLoaded = -1; 2352 2353 // Check to see if this bulk load would exceed the space quota for this table 2354 if (spaceQuotaEnabled) { 2355 ActivePolicyEnforcement activeSpaceQuotas = getSpaceQuotaManager().getActiveEnforcements(); 2356 SpaceViolationPolicyEnforcement enforcement = 2357 activeSpaceQuotas.getPolicyEnforcement(region); 2358 if (enforcement != null) { 2359 // Bulk loads must still be atomic. We must enact all or none. 2360 List<String> filePaths = new ArrayList<>(request.getFamilyPathCount()); 2361 for (FamilyPath familyPath : request.getFamilyPathList()) { 2362 filePaths.add(familyPath.getPath()); 2363 } 2364 // Check if the batch of files exceeds the current quota 2365 sizeToBeLoaded = enforcement.computeBulkLoadSize(getFileSystem(filePaths), filePaths); 2366 } 2367 } 2368 // secure bulk load 2369 Map<byte[], List<Path>> map = 2370 server.getSecureBulkLoadManager().secureBulkLoadHFiles(region, request, clusterIds); 2371 BulkLoadHFileResponse.Builder builder = BulkLoadHFileResponse.newBuilder(); 2372 builder.setLoaded(map != null); 2373 if (map != null) { 2374 // Treat any negative size as a flag to "ignore" updating the region size as that is 2375 // not possible to occur in real life (cannot bulk load a file with negative size) 2376 if (spaceQuotaEnabled && sizeToBeLoaded > 0) { 2377 if (LOG.isTraceEnabled()) { 2378 LOG.trace("Incrementing space use of " + region.getRegionInfo() + " by " 2379 + sizeToBeLoaded + " bytes"); 2380 } 2381 // Inform space quotas of the new files for this region 2382 getSpaceQuotaManager().getRegionSizeStore().incrementRegionSize(region.getRegionInfo(), 2383 sizeToBeLoaded); 2384 } 2385 } 2386 return builder.build(); 2387 } catch (IOException ie) { 2388 throw new ServiceException(ie); 2389 } finally { 2390 final MetricsRegionServer metricsRegionServer = server.getMetrics(); 2391 if (metricsRegionServer != null) { 2392 metricsRegionServer.updateBulkLoad(EnvironmentEdgeManager.currentTime() - start); 2393 } 2394 } 2395 } 2396 2397 @Override 2398 public PrepareBulkLoadResponse prepareBulkLoad(RpcController controller, 2399 PrepareBulkLoadRequest request) throws ServiceException { 2400 try { 2401 checkOpen(); 2402 requestCount.increment(); 2403 2404 HRegion region = getRegion(request.getRegion()); 2405 2406 String bulkToken = server.getSecureBulkLoadManager().prepareBulkLoad(region, request); 2407 PrepareBulkLoadResponse.Builder builder = PrepareBulkLoadResponse.newBuilder(); 2408 builder.setBulkToken(bulkToken); 2409 return builder.build(); 2410 } catch (IOException ie) { 2411 throw new ServiceException(ie); 2412 } 2413 } 2414 2415 @Override 2416 public CleanupBulkLoadResponse cleanupBulkLoad(RpcController controller, 2417 CleanupBulkLoadRequest request) throws ServiceException { 2418 try { 2419 checkOpen(); 2420 requestCount.increment(); 2421 2422 HRegion region = getRegion(request.getRegion()); 2423 2424 server.getSecureBulkLoadManager().cleanupBulkLoad(region, request); 2425 return CleanupBulkLoadResponse.newBuilder().build(); 2426 } catch (IOException ie) { 2427 throw new ServiceException(ie); 2428 } 2429 } 2430 2431 @Override 2432 public CoprocessorServiceResponse execService(final RpcController controller, 2433 final CoprocessorServiceRequest request) throws ServiceException { 2434 try { 2435 checkOpen(); 2436 requestCount.increment(); 2437 HRegion region = getRegion(request.getRegion()); 2438 Message result = execServiceOnRegion(region, request.getCall()); 2439 CoprocessorServiceResponse.Builder builder = CoprocessorServiceResponse.newBuilder(); 2440 builder.setRegion(RequestConverter.buildRegionSpecifier(RegionSpecifierType.REGION_NAME, 2441 region.getRegionInfo().getRegionName())); 2442 // TODO: COPIES!!!!!! 2443 builder.setValue(builder.getValueBuilder().setName(result.getClass().getName()).setValue( 2444 org.apache.hbase.thirdparty.com.google.protobuf.ByteString.copyFrom(result.toByteArray()))); 2445 return builder.build(); 2446 } catch (IOException ie) { 2447 throw new ServiceException(ie); 2448 } 2449 } 2450 2451 private FileSystem getFileSystem(List<String> filePaths) throws IOException { 2452 if (filePaths.isEmpty()) { 2453 // local hdfs 2454 return server.getFileSystem(); 2455 } 2456 // source hdfs 2457 return new Path(filePaths.get(0)).getFileSystem(server.getConfiguration()); 2458 } 2459 2460 private Message execServiceOnRegion(HRegion region, 2461 final ClientProtos.CoprocessorServiceCall serviceCall) throws IOException { 2462 // ignore the passed in controller (from the serialized call) 2463 ServerRpcController execController = new ServerRpcController(); 2464 return region.execService(execController, serviceCall); 2465 } 2466 2467 private boolean shouldRejectRequestsFromClient(HRegion region) { 2468 TableName table = region.getRegionInfo().getTable(); 2469 ReplicationSourceService service = server.getReplicationSourceService(); 2470 return service != null && service.getSyncReplicationPeerInfoProvider().checkState(table, 2471 RejectRequestsFromClientStateChecker.get()); 2472 } 2473 2474 private void rejectIfInStandByState(HRegion region) throws DoNotRetryIOException { 2475 if (shouldRejectRequestsFromClient(region)) { 2476 throw new DoNotRetryIOException( 2477 region.getRegionInfo().getRegionNameAsString() + " is in STANDBY state."); 2478 } 2479 } 2480 2481 /** 2482 * Get data from a table. 2483 * @param controller the RPC controller 2484 * @param request the get request 2485 */ 2486 @Override 2487 public GetResponse get(final RpcController controller, final GetRequest request) 2488 throws ServiceException { 2489 long before = EnvironmentEdgeManager.currentTime(); 2490 OperationQuota quota = null; 2491 HRegion region = null; 2492 RpcCallContext context = RpcServer.getCurrentCall().orElse(null); 2493 try { 2494 checkOpen(); 2495 requestCount.increment(); 2496 rpcGetRequestCount.increment(); 2497 region = getRegion(request.getRegion()); 2498 rejectIfInStandByState(region); 2499 2500 GetResponse.Builder builder = GetResponse.newBuilder(); 2501 ClientProtos.Get get = request.getGet(); 2502 // An asynchbase client, https://github.com/OpenTSDB/asynchbase, starts by trying to do 2503 // a get closest before. Throwing the UnknownProtocolException signals it that it needs 2504 // to switch and do hbase2 protocol (HBase servers do not tell clients what versions 2505 // they are; its a problem for non-native clients like asynchbase. HBASE-20225. 2506 if (get.hasClosestRowBefore() && get.getClosestRowBefore()) { 2507 throw new UnknownProtocolException("Is this a pre-hbase-1.0.0 or asynchbase client? " 2508 + "Client is invoking getClosestRowBefore removed in hbase-2.0.0 replaced by " 2509 + "reverse Scan."); 2510 } 2511 Boolean existence = null; 2512 Result r = null; 2513 quota = getRpcQuotaManager().checkBatchQuota(region, OperationQuota.OperationType.GET); 2514 2515 Get clientGet = ProtobufUtil.toGet(get); 2516 if (get.getExistenceOnly() && region.getCoprocessorHost() != null) { 2517 existence = region.getCoprocessorHost().preExists(clientGet); 2518 } 2519 if (existence == null) { 2520 if (context != null) { 2521 r = get(clientGet, (region), null, context); 2522 } else { 2523 // for test purpose 2524 r = region.get(clientGet); 2525 } 2526 if (get.getExistenceOnly()) { 2527 boolean exists = r.getExists(); 2528 if (region.getCoprocessorHost() != null) { 2529 exists = region.getCoprocessorHost().postExists(clientGet, exists); 2530 } 2531 existence = exists; 2532 } 2533 } 2534 if (existence != null) { 2535 ClientProtos.Result pbr = 2536 ProtobufUtil.toResult(existence, region.getRegionInfo().getReplicaId() != 0); 2537 builder.setResult(pbr); 2538 } else if (r != null) { 2539 ClientProtos.Result pbr; 2540 if ( 2541 isClientCellBlockSupport(context) && controller instanceof HBaseRpcController 2542 && VersionInfoUtil.hasMinimumVersion(context.getClientVersionInfo(), 1, 3) 2543 ) { 2544 pbr = ProtobufUtil.toResultNoData(r); 2545 ((HBaseRpcController) controller).setCellScanner( 2546 PrivateCellUtil.createExtendedCellScanner(ClientInternalHelper.getExtendedRawCells(r))); 2547 addSize(context, r); 2548 } else { 2549 pbr = ProtobufUtil.toResult(r); 2550 } 2551 builder.setResult(pbr); 2552 } 2553 // r.cells is null when an table.exists(get) call 2554 if (r != null && r.rawCells() != null) { 2555 quota.addGetResult(r); 2556 } 2557 return builder.build(); 2558 } catch (IOException ie) { 2559 throw new ServiceException(ie); 2560 } finally { 2561 final MetricsRegionServer metricsRegionServer = server.getMetrics(); 2562 if (metricsRegionServer != null && region != null) { 2563 long blockBytesScanned = context != null ? context.getBlockBytesScanned() : 0; 2564 metricsRegionServer.updateGet(region, EnvironmentEdgeManager.currentTime() - before, 2565 blockBytesScanned); 2566 } 2567 if (quota != null) { 2568 quota.close(); 2569 } 2570 } 2571 } 2572 2573 private Result get(Get get, HRegion region, RegionScannersCloseCallBack closeCallBack, 2574 RpcCallContext context) throws IOException { 2575 region.prepareGet(get); 2576 boolean stale = region.getRegionInfo().getReplicaId() != 0; 2577 2578 // This method is almost the same as HRegion#get. 2579 List<Cell> results = new ArrayList<>(); 2580 // pre-get CP hook 2581 if (region.getCoprocessorHost() != null) { 2582 if (region.getCoprocessorHost().preGet(get, results)) { 2583 region.metricsUpdateForGet(); 2584 return Result.create(results, get.isCheckExistenceOnly() ? !results.isEmpty() : null, 2585 stale); 2586 } 2587 } 2588 Scan scan = new Scan(get); 2589 if (scan.getLoadColumnFamiliesOnDemandValue() == null) { 2590 scan.setLoadColumnFamiliesOnDemand(region.isLoadingCfsOnDemandDefault()); 2591 } 2592 RegionScannerImpl scanner = null; 2593 long blockBytesScannedBefore = context.getBlockBytesScanned(); 2594 try { 2595 scanner = region.getScanner(scan); 2596 scanner.next(results); 2597 } finally { 2598 if (scanner != null) { 2599 if (closeCallBack == null) { 2600 // If there is a context then the scanner can be added to the current 2601 // RpcCallContext. The rpc callback will take care of closing the 2602 // scanner, for eg in case 2603 // of get() 2604 context.setCallBack(scanner); 2605 } else { 2606 // The call is from multi() where the results from the get() are 2607 // aggregated and then send out to the 2608 // rpc. The rpccall back will close all such scanners created as part 2609 // of multi(). 2610 closeCallBack.addScanner(scanner); 2611 } 2612 } 2613 } 2614 2615 // post-get CP hook 2616 if (region.getCoprocessorHost() != null) { 2617 region.getCoprocessorHost().postGet(get, results); 2618 } 2619 region.metricsUpdateForGet(); 2620 2621 Result r = 2622 Result.create(results, get.isCheckExistenceOnly() ? !results.isEmpty() : null, stale); 2623 if (get.isQueryMetricsEnabled()) { 2624 long blockBytesScanned = context.getBlockBytesScanned() - blockBytesScannedBefore; 2625 r.setMetrics(new QueryMetrics(blockBytesScanned)); 2626 } 2627 return r; 2628 } 2629 2630 private void checkBatchSizeAndLogLargeSize(MultiRequest request) throws ServiceException { 2631 int sum = 0; 2632 String firstRegionName = null; 2633 for (RegionAction regionAction : request.getRegionActionList()) { 2634 if (sum == 0) { 2635 firstRegionName = Bytes.toStringBinary(regionAction.getRegion().getValue().toByteArray()); 2636 } 2637 sum += regionAction.getActionCount(); 2638 } 2639 if (sum > rowSizeWarnThreshold) { 2640 LOG.warn("Large batch operation detected (greater than " + rowSizeWarnThreshold 2641 + ") (HBASE-18023)." + " Requested Number of Rows: " + sum + " Client: " 2642 + RpcServer.getRequestUserName().orElse(null) + "/" 2643 + RpcServer.getRemoteAddress().orElse(null) + " first region in multi=" + firstRegionName); 2644 if (rejectRowsWithSizeOverThreshold) { 2645 throw new ServiceException( 2646 "Rejecting large batch operation for current batch with firstRegionName: " 2647 + firstRegionName + " , Requested Number of Rows: " + sum + " , Size Threshold: " 2648 + rowSizeWarnThreshold); 2649 } 2650 } 2651 } 2652 2653 private void failRegionAction(MultiResponse.Builder responseBuilder, 2654 RegionActionResult.Builder regionActionResultBuilder, RegionAction regionAction, 2655 CellScanner cellScanner, Throwable error) { 2656 rpcServer.getMetrics().exception(error); 2657 regionActionResultBuilder.setException(ResponseConverter.buildException(error)); 2658 responseBuilder.addRegionActionResult(regionActionResultBuilder.build()); 2659 // All Mutations in this RegionAction not executed as we can not see the Region online here 2660 // in this RS. Will be retried from Client. Skipping all the Cells in CellScanner 2661 // corresponding to these Mutations. 2662 if (cellScanner != null) { 2663 skipCellsForMutations(regionAction.getActionList(), cellScanner); 2664 } 2665 } 2666 2667 private boolean isReplicationRequest(Action action) { 2668 // replication request can only be put or delete. 2669 if (!action.hasMutation()) { 2670 return false; 2671 } 2672 MutationProto mutation = action.getMutation(); 2673 MutationType type = mutation.getMutateType(); 2674 if (type != MutationType.PUT && type != MutationType.DELETE) { 2675 return false; 2676 } 2677 // replication will set a special attribute so we can make use of it to decide whether a request 2678 // is for replication. 2679 return mutation.getAttributeList().stream().map(p -> p.getName()) 2680 .filter(n -> n.equals(ReplicationUtils.REPLICATION_ATTR_NAME)).findAny().isPresent(); 2681 } 2682 2683 /** 2684 * Execute multiple actions on a table: get, mutate, and/or execCoprocessor 2685 * @param rpcc the RPC controller 2686 * @param request the multi request 2687 */ 2688 @Override 2689 public MultiResponse multi(final RpcController rpcc, final MultiRequest request) 2690 throws ServiceException { 2691 try { 2692 checkOpen(); 2693 } catch (IOException ie) { 2694 throw new ServiceException(ie); 2695 } 2696 2697 checkBatchSizeAndLogLargeSize(request); 2698 2699 // rpc controller is how we bring in data via the back door; it is unprotobuf'ed data. 2700 // It is also the conduit via which we pass back data. 2701 HBaseRpcController controller = (HBaseRpcController) rpcc; 2702 CellScanner cellScanner = controller != null ? getAndReset(controller) : null; 2703 2704 long nonceGroup = request.hasNonceGroup() ? request.getNonceGroup() : HConstants.NO_NONCE; 2705 2706 MultiResponse.Builder responseBuilder = MultiResponse.newBuilder(); 2707 RegionActionResult.Builder regionActionResultBuilder = RegionActionResult.newBuilder(); 2708 this.rpcMultiRequestCount.increment(); 2709 this.requestCount.increment(); 2710 ActivePolicyEnforcement spaceQuotaEnforcement = getSpaceQuotaManager().getActiveEnforcements(); 2711 2712 // We no longer use MultiRequest#condition. Instead, we use RegionAction#condition. The 2713 // following logic is for backward compatibility as old clients still use 2714 // MultiRequest#condition in case of checkAndMutate with RowMutations. 2715 if (request.hasCondition()) { 2716 if (request.getRegionActionList().isEmpty()) { 2717 // If the region action list is empty, do nothing. 2718 responseBuilder.setProcessed(true); 2719 return responseBuilder.build(); 2720 } 2721 2722 RegionAction regionAction = request.getRegionAction(0); 2723 2724 // When request.hasCondition() is true, regionAction.getAtomic() should be always true. So 2725 // we can assume regionAction.getAtomic() is true here. 2726 assert regionAction.getAtomic(); 2727 2728 OperationQuota quota; 2729 HRegion region; 2730 RegionSpecifier regionSpecifier = regionAction.getRegion(); 2731 2732 try { 2733 region = getRegion(regionSpecifier); 2734 quota = getRpcQuotaManager().checkBatchQuota(region, regionAction.getActionList(), 2735 regionAction.hasCondition()); 2736 } catch (IOException e) { 2737 failRegionAction(responseBuilder, regionActionResultBuilder, regionAction, cellScanner, e); 2738 return responseBuilder.build(); 2739 } 2740 2741 try { 2742 boolean rejectIfFromClient = shouldRejectRequestsFromClient(region); 2743 // We only allow replication in standby state and it will not set the atomic flag. 2744 if (rejectIfFromClient) { 2745 failRegionAction(responseBuilder, regionActionResultBuilder, regionAction, cellScanner, 2746 new DoNotRetryIOException( 2747 region.getRegionInfo().getRegionNameAsString() + " is in STANDBY state")); 2748 return responseBuilder.build(); 2749 } 2750 2751 try { 2752 CheckAndMutateResult result = checkAndMutate(region, regionAction.getActionList(), 2753 cellScanner, request.getCondition(), nonceGroup, spaceQuotaEnforcement); 2754 responseBuilder.setProcessed(result.isSuccess()); 2755 ClientProtos.ResultOrException.Builder resultOrExceptionOrBuilder = 2756 ClientProtos.ResultOrException.newBuilder(); 2757 for (int i = 0; i < regionAction.getActionCount(); i++) { 2758 // To unify the response format with doNonAtomicRegionMutation and read through 2759 // client's AsyncProcess we have to add an empty result instance per operation 2760 resultOrExceptionOrBuilder.clear(); 2761 resultOrExceptionOrBuilder.setIndex(i); 2762 regionActionResultBuilder.addResultOrException(resultOrExceptionOrBuilder.build()); 2763 } 2764 } catch (IOException e) { 2765 rpcServer.getMetrics().exception(e); 2766 // As it's an atomic operation with a condition, we may expect it's a global failure. 2767 regionActionResultBuilder.setException(ResponseConverter.buildException(e)); 2768 } 2769 } finally { 2770 quota.close(); 2771 } 2772 2773 responseBuilder.addRegionActionResult(regionActionResultBuilder.build()); 2774 ClientProtos.RegionLoadStats regionLoadStats = region.getLoadStatistics(); 2775 if (regionLoadStats != null) { 2776 responseBuilder.setRegionStatistics(MultiRegionLoadStats.newBuilder() 2777 .addRegion(regionSpecifier).addStat(regionLoadStats).build()); 2778 } 2779 return responseBuilder.build(); 2780 } 2781 2782 // this will contain all the cells that we need to return. It's created later, if needed. 2783 List<ExtendedCellScannable> cellsToReturn = null; 2784 RegionScannersCloseCallBack closeCallBack = null; 2785 RpcCallContext context = RpcServer.getCurrentCall().orElse(null); 2786 Map<RegionSpecifier, ClientProtos.RegionLoadStats> regionStats = 2787 new HashMap<>(request.getRegionActionCount()); 2788 2789 for (RegionAction regionAction : request.getRegionActionList()) { 2790 OperationQuota quota; 2791 HRegion region; 2792 RegionSpecifier regionSpecifier = regionAction.getRegion(); 2793 regionActionResultBuilder.clear(); 2794 2795 try { 2796 region = getRegion(regionSpecifier); 2797 quota = getRpcQuotaManager().checkBatchQuota(region, regionAction.getActionList(), 2798 regionAction.hasCondition()); 2799 } catch (IOException e) { 2800 failRegionAction(responseBuilder, regionActionResultBuilder, regionAction, cellScanner, e); 2801 continue; // For this region it's a failure. 2802 } 2803 2804 try { 2805 boolean rejectIfFromClient = shouldRejectRequestsFromClient(region); 2806 2807 if (regionAction.hasCondition()) { 2808 // We only allow replication in standby state and it will not set the atomic flag. 2809 if (rejectIfFromClient) { 2810 failRegionAction(responseBuilder, regionActionResultBuilder, regionAction, cellScanner, 2811 new DoNotRetryIOException( 2812 region.getRegionInfo().getRegionNameAsString() + " is in STANDBY state")); 2813 continue; 2814 } 2815 2816 try { 2817 ClientProtos.ResultOrException.Builder resultOrExceptionOrBuilder = 2818 ClientProtos.ResultOrException.newBuilder(); 2819 if (regionAction.getActionCount() == 1) { 2820 CheckAndMutateResult result = 2821 checkAndMutate(region, quota, regionAction.getAction(0).getMutation(), cellScanner, 2822 regionAction.getCondition(), nonceGroup, spaceQuotaEnforcement, context); 2823 regionActionResultBuilder.setProcessed(result.isSuccess()); 2824 resultOrExceptionOrBuilder.setIndex(0); 2825 if (result.getResult() != null) { 2826 resultOrExceptionOrBuilder.setResult(ProtobufUtil.toResult(result.getResult())); 2827 } 2828 2829 if (result.getMetrics() != null) { 2830 resultOrExceptionOrBuilder 2831 .setMetrics(ProtobufUtil.toQueryMetrics(result.getMetrics())); 2832 } 2833 2834 regionActionResultBuilder.addResultOrException(resultOrExceptionOrBuilder.build()); 2835 } else { 2836 CheckAndMutateResult result = checkAndMutate(region, regionAction.getActionList(), 2837 cellScanner, regionAction.getCondition(), nonceGroup, spaceQuotaEnforcement); 2838 regionActionResultBuilder.setProcessed(result.isSuccess()); 2839 for (int i = 0; i < regionAction.getActionCount(); i++) { 2840 if (i == 0 && result.getResult() != null) { 2841 // Set the result of the Increment/Append operations to the first element of the 2842 // ResultOrException list 2843 resultOrExceptionOrBuilder.setIndex(i); 2844 regionActionResultBuilder.addResultOrException(resultOrExceptionOrBuilder 2845 .setResult(ProtobufUtil.toResult(result.getResult())).build()); 2846 continue; 2847 } 2848 // To unify the response format with doNonAtomicRegionMutation and read through 2849 // client's AsyncProcess we have to add an empty result instance per operation 2850 resultOrExceptionOrBuilder.clear(); 2851 resultOrExceptionOrBuilder.setIndex(i); 2852 regionActionResultBuilder.addResultOrException(resultOrExceptionOrBuilder.build()); 2853 } 2854 } 2855 } catch (IOException e) { 2856 rpcServer.getMetrics().exception(e); 2857 // As it's an atomic operation with a condition, we may expect it's a global failure. 2858 regionActionResultBuilder.setException(ResponseConverter.buildException(e)); 2859 } 2860 } else if (regionAction.hasAtomic() && regionAction.getAtomic()) { 2861 // We only allow replication in standby state and it will not set the atomic flag. 2862 if (rejectIfFromClient) { 2863 failRegionAction(responseBuilder, regionActionResultBuilder, regionAction, cellScanner, 2864 new DoNotRetryIOException( 2865 region.getRegionInfo().getRegionNameAsString() + " is in STANDBY state")); 2866 continue; 2867 } 2868 try { 2869 doAtomicBatchOp(regionActionResultBuilder, region, quota, regionAction.getActionList(), 2870 cellScanner, nonceGroup, spaceQuotaEnforcement); 2871 regionActionResultBuilder.setProcessed(true); 2872 // We no longer use MultiResponse#processed. Instead, we use 2873 // RegionActionResult#processed. This is for backward compatibility for old clients. 2874 responseBuilder.setProcessed(true); 2875 } catch (IOException e) { 2876 rpcServer.getMetrics().exception(e); 2877 // As it's atomic, we may expect it's a global failure. 2878 regionActionResultBuilder.setException(ResponseConverter.buildException(e)); 2879 } 2880 } else { 2881 if ( 2882 rejectIfFromClient && regionAction.getActionCount() > 0 2883 && !isReplicationRequest(regionAction.getAction(0)) 2884 ) { 2885 // fail if it is not a replication request 2886 failRegionAction(responseBuilder, regionActionResultBuilder, regionAction, cellScanner, 2887 new DoNotRetryIOException( 2888 region.getRegionInfo().getRegionNameAsString() + " is in STANDBY state")); 2889 continue; 2890 } 2891 // doNonAtomicRegionMutation manages the exception internally 2892 if (context != null && closeCallBack == null) { 2893 // An RpcCallBack that creates a list of scanners that needs to perform callBack 2894 // operation on completion of multiGets. 2895 // Set this only once 2896 closeCallBack = new RegionScannersCloseCallBack(); 2897 context.setCallBack(closeCallBack); 2898 } 2899 cellsToReturn = doNonAtomicRegionMutation(region, quota, regionAction, cellScanner, 2900 regionActionResultBuilder, cellsToReturn, nonceGroup, closeCallBack, context, 2901 spaceQuotaEnforcement); 2902 } 2903 } finally { 2904 quota.close(); 2905 } 2906 2907 responseBuilder.addRegionActionResult(regionActionResultBuilder.build()); 2908 ClientProtos.RegionLoadStats regionLoadStats = region.getLoadStatistics(); 2909 if (regionLoadStats != null) { 2910 regionStats.put(regionSpecifier, regionLoadStats); 2911 } 2912 } 2913 // Load the controller with the Cells to return. 2914 if (cellsToReturn != null && !cellsToReturn.isEmpty() && controller != null) { 2915 controller.setCellScanner(PrivateCellUtil.createExtendedCellScanner(cellsToReturn)); 2916 } 2917 2918 MultiRegionLoadStats.Builder builder = MultiRegionLoadStats.newBuilder(); 2919 for (Entry<RegionSpecifier, ClientProtos.RegionLoadStats> stat : regionStats.entrySet()) { 2920 builder.addRegion(stat.getKey()); 2921 builder.addStat(stat.getValue()); 2922 } 2923 responseBuilder.setRegionStatistics(builder); 2924 return responseBuilder.build(); 2925 } 2926 2927 private void skipCellsForMutations(List<Action> actions, CellScanner cellScanner) { 2928 if (cellScanner == null) { 2929 return; 2930 } 2931 for (Action action : actions) { 2932 skipCellsForMutation(action, cellScanner); 2933 } 2934 } 2935 2936 private void skipCellsForMutation(Action action, CellScanner cellScanner) { 2937 if (cellScanner == null) { 2938 return; 2939 } 2940 try { 2941 if (action.hasMutation()) { 2942 MutationProto m = action.getMutation(); 2943 if (m.hasAssociatedCellCount()) { 2944 for (int i = 0; i < m.getAssociatedCellCount(); i++) { 2945 cellScanner.advance(); 2946 } 2947 } 2948 } 2949 } catch (IOException e) { 2950 // No need to handle these Individual Muatation level issue. Any way this entire RegionAction 2951 // marked as failed as we could not see the Region here. At client side the top level 2952 // RegionAction exception will be considered first. 2953 LOG.error("Error while skipping Cells in CellScanner for invalid Region Mutations", e); 2954 } 2955 } 2956 2957 /** 2958 * Mutate data in a table. 2959 * @param rpcc the RPC controller 2960 * @param request the mutate request 2961 */ 2962 @Override 2963 public MutateResponse mutate(final RpcController rpcc, final MutateRequest request) 2964 throws ServiceException { 2965 // rpc controller is how we bring in data via the back door; it is unprotobuf'ed data. 2966 // It is also the conduit via which we pass back data. 2967 HBaseRpcController controller = (HBaseRpcController) rpcc; 2968 CellScanner cellScanner = controller != null ? controller.cellScanner() : null; 2969 OperationQuota quota = null; 2970 RpcCallContext context = RpcServer.getCurrentCall().orElse(null); 2971 // Clear scanner so we are not holding on to reference across call. 2972 if (controller != null) { 2973 controller.setCellScanner(null); 2974 } 2975 try { 2976 checkOpen(); 2977 requestCount.increment(); 2978 rpcMutateRequestCount.increment(); 2979 HRegion region = getRegion(request.getRegion()); 2980 rejectIfInStandByState(region); 2981 MutateResponse.Builder builder = MutateResponse.newBuilder(); 2982 MutationProto mutation = request.getMutation(); 2983 if (!region.getRegionInfo().isMetaRegion()) { 2984 server.getMemStoreFlusher().reclaimMemStoreMemory(); 2985 } 2986 long nonceGroup = request.hasNonceGroup() ? request.getNonceGroup() : HConstants.NO_NONCE; 2987 OperationQuota.OperationType operationType = QuotaUtil.getQuotaOperationType(request); 2988 quota = getRpcQuotaManager().checkBatchQuota(region, operationType); 2989 ActivePolicyEnforcement spaceQuotaEnforcement = 2990 getSpaceQuotaManager().getActiveEnforcements(); 2991 2992 if (request.hasCondition()) { 2993 CheckAndMutateResult result = checkAndMutate(region, quota, mutation, cellScanner, 2994 request.getCondition(), nonceGroup, spaceQuotaEnforcement, context); 2995 builder.setProcessed(result.isSuccess()); 2996 boolean clientCellBlockSupported = isClientCellBlockSupport(context); 2997 addResult(builder, result.getResult(), controller, clientCellBlockSupported); 2998 if (clientCellBlockSupported) { 2999 addSize(context, result.getResult()); 3000 } 3001 if (result.getMetrics() != null) { 3002 builder.setMetrics(ProtobufUtil.toQueryMetrics(result.getMetrics())); 3003 } 3004 } else { 3005 Result r = null; 3006 Boolean processed = null; 3007 MutationType type = mutation.getMutateType(); 3008 switch (type) { 3009 case APPEND: 3010 // TODO: this doesn't actually check anything. 3011 r = append(region, quota, mutation, cellScanner, nonceGroup, spaceQuotaEnforcement, 3012 context); 3013 break; 3014 case INCREMENT: 3015 // TODO: this doesn't actually check anything. 3016 r = increment(region, quota, mutation, cellScanner, nonceGroup, spaceQuotaEnforcement, 3017 context); 3018 break; 3019 case PUT: 3020 put(region, quota, mutation, cellScanner, spaceQuotaEnforcement); 3021 processed = Boolean.TRUE; 3022 break; 3023 case DELETE: 3024 delete(region, quota, mutation, cellScanner, spaceQuotaEnforcement); 3025 processed = Boolean.TRUE; 3026 break; 3027 default: 3028 throw new DoNotRetryIOException("Unsupported mutate type: " + type.name()); 3029 } 3030 if (processed != null) { 3031 builder.setProcessed(processed); 3032 } 3033 boolean clientCellBlockSupported = isClientCellBlockSupport(context); 3034 addResult(builder, r, controller, clientCellBlockSupported); 3035 if (clientCellBlockSupported) { 3036 addSize(context, r); 3037 } 3038 } 3039 return builder.build(); 3040 } catch (IOException ie) { 3041 server.checkFileSystem(); 3042 throw new ServiceException(ie); 3043 } finally { 3044 if (quota != null) { 3045 quota.close(); 3046 } 3047 } 3048 } 3049 3050 private void put(HRegion region, OperationQuota quota, MutationProto mutation, 3051 CellScanner cellScanner, ActivePolicyEnforcement spaceQuota) throws IOException { 3052 long before = EnvironmentEdgeManager.currentTime(); 3053 Put put = ProtobufUtil.toPut(mutation, cellScanner); 3054 checkCellSizeLimit(region, put); 3055 spaceQuota.getPolicyEnforcement(region).check(put); 3056 quota.addMutation(put); 3057 region.put(put); 3058 3059 MetricsRegionServer metricsRegionServer = server.getMetrics(); 3060 if (metricsRegionServer != null) { 3061 long after = EnvironmentEdgeManager.currentTime(); 3062 metricsRegionServer.updatePut(region, after - before); 3063 } 3064 } 3065 3066 private void delete(HRegion region, OperationQuota quota, MutationProto mutation, 3067 CellScanner cellScanner, ActivePolicyEnforcement spaceQuota) throws IOException { 3068 long before = EnvironmentEdgeManager.currentTime(); 3069 Delete delete = ProtobufUtil.toDelete(mutation, cellScanner); 3070 checkCellSizeLimit(region, delete); 3071 spaceQuota.getPolicyEnforcement(region).check(delete); 3072 quota.addMutation(delete); 3073 region.delete(delete); 3074 3075 MetricsRegionServer metricsRegionServer = server.getMetrics(); 3076 if (metricsRegionServer != null) { 3077 long after = EnvironmentEdgeManager.currentTime(); 3078 metricsRegionServer.updateDelete(region, after - before); 3079 } 3080 } 3081 3082 private CheckAndMutateResult checkAndMutate(HRegion region, OperationQuota quota, 3083 MutationProto mutation, CellScanner cellScanner, Condition condition, long nonceGroup, 3084 ActivePolicyEnforcement spaceQuota, RpcCallContext context) throws IOException { 3085 long before = EnvironmentEdgeManager.currentTime(); 3086 long blockBytesScannedBefore = context != null ? context.getBlockBytesScanned() : 0; 3087 CheckAndMutate checkAndMutate = ProtobufUtil.toCheckAndMutate(condition, mutation, cellScanner); 3088 long nonce = mutation.hasNonce() ? mutation.getNonce() : HConstants.NO_NONCE; 3089 checkCellSizeLimit(region, (Mutation) checkAndMutate.getAction()); 3090 spaceQuota.getPolicyEnforcement(region).check((Mutation) checkAndMutate.getAction()); 3091 quota.addMutation((Mutation) checkAndMutate.getAction()); 3092 3093 CheckAndMutateResult result = null; 3094 if (region.getCoprocessorHost() != null) { 3095 result = region.getCoprocessorHost().preCheckAndMutate(checkAndMutate); 3096 } 3097 if (result == null) { 3098 result = region.checkAndMutate(checkAndMutate, nonceGroup, nonce); 3099 if (region.getCoprocessorHost() != null) { 3100 result = region.getCoprocessorHost().postCheckAndMutate(checkAndMutate, result); 3101 } 3102 } 3103 MetricsRegionServer metricsRegionServer = server.getMetrics(); 3104 if (metricsRegionServer != null) { 3105 long after = EnvironmentEdgeManager.currentTime(); 3106 long blockBytesScanned = 3107 context != null ? context.getBlockBytesScanned() - blockBytesScannedBefore : 0; 3108 metricsRegionServer.updateCheckAndMutate(region, after - before, blockBytesScanned); 3109 3110 MutationType type = mutation.getMutateType(); 3111 switch (type) { 3112 case PUT: 3113 metricsRegionServer.updateCheckAndPut(region, after - before); 3114 break; 3115 case DELETE: 3116 metricsRegionServer.updateCheckAndDelete(region, after - before); 3117 break; 3118 default: 3119 break; 3120 } 3121 } 3122 return result; 3123 } 3124 3125 // This is used to keep compatible with the old client implementation. Consider remove it if we 3126 // decide to drop the support of the client that still sends close request to a region scanner 3127 // which has already been exhausted. 3128 @Deprecated 3129 private static final IOException SCANNER_ALREADY_CLOSED = new IOException() { 3130 3131 private static final long serialVersionUID = -4305297078988180130L; 3132 3133 @Override 3134 public synchronized Throwable fillInStackTrace() { 3135 return this; 3136 } 3137 }; 3138 3139 private RegionScannerHolder getRegionScanner(ScanRequest request) throws IOException { 3140 String scannerName = toScannerName(request.getScannerId()); 3141 RegionScannerHolder rsh = this.scanners.get(scannerName); 3142 if (rsh == null) { 3143 // just ignore the next or close request if scanner does not exists. 3144 Long lastCallSeq = closedScanners.getIfPresent(scannerName); 3145 if (lastCallSeq != null) { 3146 // Check the sequence number to catch if the last call was incorrectly retried. 3147 // The only allowed scenario is when the scanner is exhausted and one more scan 3148 // request arrives - in this case returning 0 rows is correct. 3149 if (request.hasNextCallSeq() && request.getNextCallSeq() != lastCallSeq + 1) { 3150 throw new OutOfOrderScannerNextException("Expected nextCallSeq for closed request: " 3151 + (lastCallSeq + 1) + " But the nextCallSeq got from client: " 3152 + request.getNextCallSeq() + "; request=" + TextFormat.shortDebugString(request)); 3153 } else { 3154 throw SCANNER_ALREADY_CLOSED; 3155 } 3156 } else { 3157 LOG.warn("Client tried to access missing scanner " + scannerName); 3158 throw new UnknownScannerException( 3159 "Unknown scanner '" + scannerName + "'. This can happen due to any of the following " 3160 + "reasons: a) Scanner id given is wrong, b) Scanner lease expired because of " 3161 + "long wait between consecutive client checkins, c) Server may be closing down, " 3162 + "d) RegionServer restart during upgrade.\nIf the issue is due to reason (b), a " 3163 + "possible fix would be increasing the value of" 3164 + "'hbase.client.scanner.timeout.period' configuration."); 3165 } 3166 } 3167 rejectIfInStandByState(rsh.r); 3168 RegionInfo hri = rsh.s.getRegionInfo(); 3169 // Yes, should be the same instance 3170 if (server.getOnlineRegion(hri.getRegionName()) != rsh.r) { 3171 String msg = "Region has changed on the scanner " + scannerName + ": regionName=" 3172 + hri.getRegionNameAsString() + ", scannerRegionName=" + rsh.r; 3173 LOG.warn(msg + ", closing..."); 3174 scanners.remove(scannerName); 3175 try { 3176 rsh.s.close(); 3177 } catch (IOException e) { 3178 LOG.warn("Getting exception closing " + scannerName, e); 3179 } finally { 3180 try { 3181 server.getLeaseManager().cancelLease(scannerName); 3182 } catch (LeaseException e) { 3183 LOG.warn("Getting exception closing " + scannerName, e); 3184 } 3185 } 3186 throw new NotServingRegionException(msg); 3187 } 3188 return rsh; 3189 } 3190 3191 /** 3192 * @return Pair with scannerName key to use with this new Scanner and its RegionScannerHolder 3193 * value. 3194 */ 3195 private Pair<String, RegionScannerHolder> newRegionScanner(ScanRequest request, HRegion region, 3196 ScanResponse.Builder builder) throws IOException { 3197 rejectIfInStandByState(region); 3198 ClientProtos.Scan protoScan = request.getScan(); 3199 boolean isLoadingCfsOnDemandSet = protoScan.hasLoadColumnFamiliesOnDemand(); 3200 Scan scan = ProtobufUtil.toScan(protoScan); 3201 // if the request doesn't set this, get the default region setting. 3202 if (!isLoadingCfsOnDemandSet) { 3203 scan.setLoadColumnFamiliesOnDemand(region.isLoadingCfsOnDemandDefault()); 3204 } 3205 3206 if (!scan.hasFamilies()) { 3207 // Adding all families to scanner 3208 for (byte[] family : region.getTableDescriptor().getColumnFamilyNames()) { 3209 scan.addFamily(family); 3210 } 3211 } 3212 if (region.getCoprocessorHost() != null) { 3213 // preScannerOpen is not allowed to return a RegionScanner. Only post hook can create a 3214 // wrapper for the core created RegionScanner 3215 region.getCoprocessorHost().preScannerOpen(scan); 3216 } 3217 RegionScannerImpl coreScanner = region.getScanner(scan); 3218 Shipper shipper = coreScanner; 3219 RegionScanner scanner = coreScanner; 3220 try { 3221 if (region.getCoprocessorHost() != null) { 3222 scanner = region.getCoprocessorHost().postScannerOpen(scan, scanner); 3223 } 3224 } catch (Exception e) { 3225 // Although region coprocessor is for advanced users and they should take care of the 3226 // implementation to not damage the HBase system, closing the scanner on exception here does 3227 // not have any bad side effect, so let's do it 3228 scanner.close(); 3229 throw e; 3230 } 3231 long scannerId = scannerIdGenerator.generateNewScannerId(); 3232 builder.setScannerId(scannerId); 3233 builder.setMvccReadPoint(scanner.getMvccReadPoint()); 3234 builder.setTtl(scannerLeaseTimeoutPeriod); 3235 String scannerName = toScannerName(scannerId); 3236 3237 boolean fullRegionScan = 3238 !region.getRegionInfo().getTable().isSystemTable() && isFullRegionScan(scan, region); 3239 3240 return new Pair<String, RegionScannerHolder>(scannerName, 3241 addScanner(scannerName, scanner, shipper, region, scan.isNeedCursorResult(), fullRegionScan)); 3242 } 3243 3244 /** 3245 * The returned String is used as key doing look up of outstanding Scanners in this Servers' 3246 * this.scanners, the Map of outstanding scanners and their current state. 3247 * @param scannerId A scanner long id. 3248 * @return The long id as a String. 3249 */ 3250 private static String toScannerName(long scannerId) { 3251 return Long.toString(scannerId); 3252 } 3253 3254 private void checkScanNextCallSeq(ScanRequest request, RegionScannerHolder rsh) 3255 throws OutOfOrderScannerNextException { 3256 // if nextCallSeq does not match throw Exception straight away. This needs to be 3257 // performed even before checking of Lease. 3258 // See HBASE-5974 3259 if (request.hasNextCallSeq()) { 3260 long callSeq = request.getNextCallSeq(); 3261 if (!rsh.incNextCallSeq(callSeq)) { 3262 throw new OutOfOrderScannerNextException( 3263 "Expected nextCallSeq: " + rsh.getNextCallSeq() + " But the nextCallSeq got from client: " 3264 + request.getNextCallSeq() + "; request=" + TextFormat.shortDebugString(request) 3265 + "; region=" + rsh.r.getRegionInfo().getRegionNameAsString()); 3266 } 3267 } 3268 } 3269 3270 private void addScannerLeaseBack(LeaseManager.Lease lease) { 3271 try { 3272 server.getLeaseManager().addLease(lease); 3273 } catch (LeaseStillHeldException e) { 3274 // should not happen as the scanner id is unique. 3275 throw new AssertionError(e); 3276 } 3277 } 3278 3279 // visible for testing only 3280 long getTimeLimit(RpcCall rpcCall, HBaseRpcController controller, 3281 boolean allowHeartbeatMessages) { 3282 // Set the time limit to be half of the more restrictive timeout value (one of the 3283 // timeout values must be positive). In the event that both values are positive, the 3284 // more restrictive of the two is used to calculate the limit. 3285 if (allowHeartbeatMessages) { 3286 long now = EnvironmentEdgeManager.currentTime(); 3287 long remainingTimeout = getRemainingRpcTimeout(rpcCall, controller, now); 3288 if (scannerLeaseTimeoutPeriod > 0 || remainingTimeout > 0) { 3289 long timeLimitDelta; 3290 if (scannerLeaseTimeoutPeriod > 0 && remainingTimeout > 0) { 3291 timeLimitDelta = Math.min(scannerLeaseTimeoutPeriod, remainingTimeout); 3292 } else { 3293 timeLimitDelta = 3294 scannerLeaseTimeoutPeriod > 0 ? scannerLeaseTimeoutPeriod : remainingTimeout; 3295 } 3296 3297 // Use half of whichever timeout value was more restrictive... But don't allow 3298 // the time limit to be less than the allowable minimum (could cause an 3299 // immediate timeout before scanning any data). 3300 timeLimitDelta = Math.max(timeLimitDelta / 2, minimumScanTimeLimitDelta); 3301 return now + timeLimitDelta; 3302 } 3303 } 3304 // Default value of timeLimit is negative to indicate no timeLimit should be 3305 // enforced. 3306 return -1L; 3307 } 3308 3309 private long getRemainingRpcTimeout(RpcCall call, HBaseRpcController controller, long now) { 3310 long timeout; 3311 if (controller != null && controller.getCallTimeout() > 0) { 3312 timeout = controller.getCallTimeout(); 3313 } else if (rpcTimeout > 0) { 3314 timeout = rpcTimeout; 3315 } else { 3316 return -1; 3317 } 3318 if (call != null) { 3319 timeout -= (now - call.getReceiveTime()); 3320 } 3321 // getTimeLimit ignores values <= 0, but timeout may now be negative if queue time was high. 3322 // return minimum value here in that case so we count this in calculating the final delta. 3323 return Math.max(minimumScanTimeLimitDelta, timeout); 3324 } 3325 3326 private void checkLimitOfRows(int numOfCompleteRows, int limitOfRows, boolean moreRows, 3327 ScannerContext scannerContext, ScanResponse.Builder builder) { 3328 if (numOfCompleteRows >= limitOfRows) { 3329 if (LOG.isTraceEnabled()) { 3330 LOG.trace("Done scanning, limit of rows reached, moreRows: " + moreRows 3331 + " scannerContext: " + scannerContext); 3332 } 3333 builder.setMoreResults(false); 3334 } 3335 } 3336 3337 // return whether we have more results in region. 3338 private void scan(HBaseRpcController controller, ScanRequest request, RegionScannerHolder rsh, 3339 long maxQuotaResultSize, int maxResults, int limitOfRows, List<Result> results, 3340 ScanResponse.Builder builder, RpcCall rpcCall, ServerSideScanMetrics scanMetrics) 3341 throws IOException { 3342 HRegion region = rsh.r; 3343 RegionScanner scanner = rsh.s; 3344 long maxResultSize; 3345 if (scanner.getMaxResultSize() > 0) { 3346 maxResultSize = Math.min(scanner.getMaxResultSize(), maxQuotaResultSize); 3347 } else { 3348 maxResultSize = maxQuotaResultSize; 3349 } 3350 // This is cells inside a row. Default size is 10 so if many versions or many cfs, 3351 // then we'll resize. Resizings show in profiler. Set it higher than 10. For now 3352 // arbitrary 32. TODO: keep record of general size of results being returned. 3353 ArrayList<Cell> values = new ArrayList<>(32); 3354 region.startRegionOperation(Operation.SCAN); 3355 long before = EnvironmentEdgeManager.currentTime(); 3356 // Used to check if we've matched the row limit set on the Scan 3357 int numOfCompleteRows = 0; 3358 // Count of times we call nextRaw; can be > numOfCompleteRows. 3359 int numOfNextRawCalls = 0; 3360 try { 3361 int numOfResults = 0; 3362 synchronized (scanner) { 3363 boolean stale = (region.getRegionInfo().getReplicaId() != 0); 3364 boolean clientHandlesPartials = 3365 request.hasClientHandlesPartials() && request.getClientHandlesPartials(); 3366 boolean clientHandlesHeartbeats = 3367 request.hasClientHandlesHeartbeats() && request.getClientHandlesHeartbeats(); 3368 3369 // On the server side we must ensure that the correct ordering of partial results is 3370 // returned to the client to allow them to properly reconstruct the partial results. 3371 // If the coprocessor host is adding to the result list, we cannot guarantee the 3372 // correct ordering of partial results and so we prevent partial results from being 3373 // formed. 3374 boolean serverGuaranteesOrderOfPartials = results.isEmpty(); 3375 boolean allowPartialResults = clientHandlesPartials && serverGuaranteesOrderOfPartials; 3376 boolean moreRows = false; 3377 3378 // Heartbeat messages occur when the processing of the ScanRequest is exceeds a 3379 // certain time threshold on the server. When the time threshold is exceeded, the 3380 // server stops the scan and sends back whatever Results it has accumulated within 3381 // that time period (may be empty). Since heartbeat messages have the potential to 3382 // create partial Results (in the event that the timeout occurs in the middle of a 3383 // row), we must only generate heartbeat messages when the client can handle both 3384 // heartbeats AND partials 3385 boolean allowHeartbeatMessages = clientHandlesHeartbeats && allowPartialResults; 3386 3387 long timeLimit = getTimeLimit(rpcCall, controller, allowHeartbeatMessages); 3388 3389 final LimitScope sizeScope = 3390 allowPartialResults ? LimitScope.BETWEEN_CELLS : LimitScope.BETWEEN_ROWS; 3391 final LimitScope timeScope = 3392 allowHeartbeatMessages ? LimitScope.BETWEEN_CELLS : LimitScope.BETWEEN_ROWS; 3393 3394 // Configure with limits for this RPC. Set keep progress true since size progress 3395 // towards size limit should be kept between calls to nextRaw 3396 ScannerContext.Builder contextBuilder = ScannerContext.newBuilder(true); 3397 // maxResultSize - either we can reach this much size for all cells(being read) data or sum 3398 // of heap size occupied by cells(being read). Cell data means its key and value parts. 3399 // maxQuotaResultSize - max results just from server side configuration and quotas, without 3400 // user's specified max. We use this for evaluating limits based on blocks (not cells). 3401 // We may have accumulated some results in coprocessor preScannerNext call. Subtract any 3402 // cell or block size from maximum here so we adhere to total limits of request. 3403 // Note: we track block size in StoreScanner. If the CP hook got cells from hbase, it will 3404 // have accumulated block bytes. If not, this will be 0 for block size. 3405 long maxCellSize = maxResultSize; 3406 long maxBlockSize = maxQuotaResultSize; 3407 if (rpcCall != null) { 3408 maxBlockSize -= rpcCall.getBlockBytesScanned(); 3409 maxCellSize -= rpcCall.getResponseCellSize(); 3410 } 3411 3412 contextBuilder.setSizeLimit(sizeScope, maxCellSize, maxCellSize, maxBlockSize); 3413 contextBuilder.setBatchLimit(scanner.getBatch()); 3414 contextBuilder.setTimeLimit(timeScope, timeLimit); 3415 contextBuilder.setTrackMetrics(scanMetrics != null); 3416 contextBuilder.setScanMetrics(scanMetrics); 3417 ScannerContext scannerContext = contextBuilder.build(); 3418 boolean limitReached = false; 3419 long blockBytesScannedBefore = 0; 3420 while (numOfResults < maxResults) { 3421 // Reset the batch progress to 0 before every call to RegionScanner#nextRaw. The 3422 // batch limit is a limit on the number of cells per Result. Thus, if progress is 3423 // being tracked (i.e. scannerContext.keepProgress() is true) then we need to 3424 // reset the batch progress between nextRaw invocations since we don't want the 3425 // batch progress from previous calls to affect future calls 3426 scannerContext.setBatchProgress(0); 3427 assert values.isEmpty(); 3428 3429 // Collect values to be returned here 3430 moreRows = scanner.nextRaw(values, scannerContext); 3431 3432 long blockBytesScanned = scannerContext.getBlockSizeProgress() - blockBytesScannedBefore; 3433 blockBytesScannedBefore = scannerContext.getBlockSizeProgress(); 3434 3435 if (rpcCall == null) { 3436 // When there is no RpcCallContext,copy EC to heap, then the scanner would close, 3437 // This can be an EXPENSIVE call. It may make an extra copy from offheap to onheap 3438 // buffers.See more details in HBASE-26036. 3439 CellUtil.cloneIfNecessary(values); 3440 } 3441 numOfNextRawCalls++; 3442 3443 if (!values.isEmpty()) { 3444 if (limitOfRows > 0) { 3445 // First we need to check if the last result is partial and we have a row change. If 3446 // so then we need to increase the numOfCompleteRows. 3447 if (results.isEmpty()) { 3448 if ( 3449 rsh.rowOfLastPartialResult != null 3450 && !CellUtil.matchingRows(values.get(0), rsh.rowOfLastPartialResult) 3451 ) { 3452 numOfCompleteRows++; 3453 checkLimitOfRows(numOfCompleteRows, limitOfRows, moreRows, scannerContext, 3454 builder); 3455 } 3456 } else { 3457 Result lastResult = results.get(results.size() - 1); 3458 if ( 3459 lastResult.mayHaveMoreCellsInRow() 3460 && !CellUtil.matchingRows(values.get(0), lastResult.getRow()) 3461 ) { 3462 numOfCompleteRows++; 3463 checkLimitOfRows(numOfCompleteRows, limitOfRows, moreRows, scannerContext, 3464 builder); 3465 } 3466 } 3467 if (builder.hasMoreResults() && !builder.getMoreResults()) { 3468 break; 3469 } 3470 } 3471 boolean mayHaveMoreCellsInRow = scannerContext.mayHaveMoreCellsInRow(); 3472 Result r = Result.create(values, null, stale, mayHaveMoreCellsInRow); 3473 3474 if (request.getScan().getQueryMetricsEnabled()) { 3475 builder.addQueryMetrics(ClientProtos.QueryMetrics.newBuilder() 3476 .setBlockBytesScanned(blockBytesScanned).build()); 3477 } 3478 3479 results.add(r); 3480 numOfResults++; 3481 if (!mayHaveMoreCellsInRow && limitOfRows > 0) { 3482 numOfCompleteRows++; 3483 checkLimitOfRows(numOfCompleteRows, limitOfRows, moreRows, scannerContext, builder); 3484 if (builder.hasMoreResults() && !builder.getMoreResults()) { 3485 break; 3486 } 3487 } 3488 } else if (!moreRows && !results.isEmpty()) { 3489 // No more cells for the scan here, we need to ensure that the mayHaveMoreCellsInRow of 3490 // last result is false. Otherwise it's possible that: the first nextRaw returned 3491 // because BATCH_LIMIT_REACHED (BTW it happen to exhaust all cells of the scan),so the 3492 // last result's mayHaveMoreCellsInRow will be true. while the following nextRaw will 3493 // return with moreRows=false, which means moreResultsInRegion would be false, it will 3494 // be a contradictory state (HBASE-21206). 3495 int lastIdx = results.size() - 1; 3496 Result r = results.get(lastIdx); 3497 if (r.mayHaveMoreCellsInRow()) { 3498 results.set(lastIdx, ClientInternalHelper.createResult( 3499 ClientInternalHelper.getExtendedRawCells(r), r.getExists(), r.isStale(), false)); 3500 } 3501 } 3502 boolean sizeLimitReached = scannerContext.checkSizeLimit(LimitScope.BETWEEN_ROWS); 3503 boolean timeLimitReached = scannerContext.checkTimeLimit(LimitScope.BETWEEN_ROWS); 3504 boolean resultsLimitReached = numOfResults >= maxResults; 3505 limitReached = sizeLimitReached || timeLimitReached || resultsLimitReached; 3506 3507 if (limitReached || !moreRows) { 3508 // With block size limit, we may exceed size limit without collecting any results. 3509 // In this case we want to send heartbeat and/or cursor. We don't want to send heartbeat 3510 // or cursor if results were collected, for example for cell size or heap size limits. 3511 boolean sizeLimitReachedWithoutResults = sizeLimitReached && results.isEmpty(); 3512 // We only want to mark a ScanResponse as a heartbeat message in the event that 3513 // there are more values to be read server side. If there aren't more values, 3514 // marking it as a heartbeat is wasteful because the client will need to issue 3515 // another ScanRequest only to realize that they already have all the values 3516 if (moreRows && (timeLimitReached || sizeLimitReachedWithoutResults)) { 3517 // Heartbeat messages occur when the time limit has been reached, or size limit has 3518 // been reached before collecting any results. This can happen for heavily filtered 3519 // scans which scan over too many blocks. 3520 builder.setHeartbeatMessage(true); 3521 if (rsh.needCursor) { 3522 Cell cursorCell = scannerContext.getLastPeekedCell(); 3523 if (cursorCell != null) { 3524 builder.setCursor(ProtobufUtil.toCursor(cursorCell)); 3525 } 3526 } 3527 } 3528 break; 3529 } 3530 values.clear(); 3531 } 3532 if (rpcCall != null) { 3533 rpcCall.incrementResponseCellSize(scannerContext.getHeapSizeProgress()); 3534 } 3535 builder.setMoreResultsInRegion(moreRows); 3536 // Check to see if the client requested that we track metrics server side. If the 3537 // client requested metrics, retrieve the metrics from the scanner context. 3538 if (scanMetrics != null) { 3539 // rather than increment yet another counter in StoreScanner, just set the value here 3540 // from block size progress before writing into the response 3541 scanMetrics.setCounter(ServerSideScanMetrics.BLOCK_BYTES_SCANNED_KEY_METRIC_NAME, 3542 scannerContext.getBlockSizeProgress()); 3543 } 3544 } 3545 } finally { 3546 region.closeRegionOperation(); 3547 // Update serverside metrics, even on error. 3548 long end = EnvironmentEdgeManager.currentTime(); 3549 3550 long responseCellSize = 0; 3551 long blockBytesScanned = 0; 3552 if (rpcCall != null) { 3553 responseCellSize = rpcCall.getResponseCellSize(); 3554 blockBytesScanned = rpcCall.getBlockBytesScanned(); 3555 rsh.updateBlockBytesScanned(blockBytesScanned); 3556 } 3557 region.getMetrics().updateScan(); 3558 final MetricsRegionServer metricsRegionServer = server.getMetrics(); 3559 if (metricsRegionServer != null) { 3560 metricsRegionServer.updateScan(region, end - before, responseCellSize, blockBytesScanned); 3561 metricsRegionServer.updateReadQueryMeter(region, numOfNextRawCalls); 3562 } 3563 } 3564 // coprocessor postNext hook 3565 if (region.getCoprocessorHost() != null) { 3566 region.getCoprocessorHost().postScannerNext(scanner, results, maxResults, true); 3567 } 3568 } 3569 3570 /** 3571 * Scan data in a table. 3572 * @param controller the RPC controller 3573 * @param request the scan request 3574 */ 3575 @Override 3576 public ScanResponse scan(final RpcController controller, final ScanRequest request) 3577 throws ServiceException { 3578 if (controller != null && !(controller instanceof HBaseRpcController)) { 3579 throw new UnsupportedOperationException( 3580 "We only do " + "HBaseRpcControllers! FIX IF A PROBLEM: " + controller); 3581 } 3582 if (!request.hasScannerId() && !request.hasScan()) { 3583 throw new ServiceException( 3584 new DoNotRetryIOException("Missing required input: scannerId or scan")); 3585 } 3586 try { 3587 checkOpen(); 3588 } catch (IOException e) { 3589 if (request.hasScannerId()) { 3590 String scannerName = toScannerName(request.getScannerId()); 3591 if (LOG.isDebugEnabled()) { 3592 LOG.debug( 3593 "Server shutting down and client tried to access missing scanner " + scannerName); 3594 } 3595 final LeaseManager leaseManager = server.getLeaseManager(); 3596 if (leaseManager != null) { 3597 try { 3598 leaseManager.cancelLease(scannerName); 3599 } catch (LeaseException le) { 3600 // No problem, ignore 3601 if (LOG.isTraceEnabled()) { 3602 LOG.trace("Un-able to cancel lease of scanner. It could already be closed."); 3603 } 3604 } 3605 } 3606 } 3607 throw new ServiceException(e); 3608 } 3609 boolean trackMetrics = request.hasTrackScanMetrics() && request.getTrackScanMetrics(); 3610 ThreadLocalServerSideScanMetrics.setScanMetricsEnabled(trackMetrics); 3611 if (trackMetrics) { 3612 ThreadLocalServerSideScanMetrics.reset(); 3613 } 3614 requestCount.increment(); 3615 rpcScanRequestCount.increment(); 3616 RegionScannerContext rsx; 3617 ScanResponse.Builder builder = ScanResponse.newBuilder(); 3618 try { 3619 rsx = checkQuotaAndGetRegionScannerContext(request, builder); 3620 } catch (IOException e) { 3621 if (e == SCANNER_ALREADY_CLOSED) { 3622 // Now we will close scanner automatically if there are no more results for this region but 3623 // the old client will still send a close request to us. Just ignore it and return. 3624 return builder.build(); 3625 } 3626 throw new ServiceException(e); 3627 } 3628 String scannerName = rsx.scannerName; 3629 RegionScannerHolder rsh = rsx.holder; 3630 OperationQuota quota = rsx.quota; 3631 if (rsh.fullRegionScan) { 3632 rpcFullScanRequestCount.increment(); 3633 } 3634 HRegion region = rsh.r; 3635 LeaseManager.Lease lease; 3636 try { 3637 // Remove lease while its being processed in server; protects against case 3638 // where processing of request takes > lease expiration time. or null if none found. 3639 lease = server.getLeaseManager().removeLease(scannerName); 3640 } catch (LeaseException e) { 3641 throw new ServiceException(e); 3642 } 3643 if (request.hasRenew() && request.getRenew()) { 3644 // add back and return 3645 addScannerLeaseBack(lease); 3646 try { 3647 checkScanNextCallSeq(request, rsh); 3648 } catch (OutOfOrderScannerNextException e) { 3649 throw new ServiceException(e); 3650 } 3651 return builder.build(); 3652 } 3653 try { 3654 checkScanNextCallSeq(request, rsh); 3655 } catch (OutOfOrderScannerNextException e) { 3656 addScannerLeaseBack(lease); 3657 throw new ServiceException(e); 3658 } 3659 // Now we have increased the next call sequence. If we give client an error, the retry will 3660 // never success. So we'd better close the scanner and return a DoNotRetryIOException to client 3661 // and then client will try to open a new scanner. 3662 boolean closeScanner = request.hasCloseScanner() ? request.getCloseScanner() : false; 3663 int rows; // this is scan.getCaching 3664 if (request.hasNumberOfRows()) { 3665 rows = request.getNumberOfRows(); 3666 } else { 3667 rows = closeScanner ? 0 : 1; 3668 } 3669 RpcCall rpcCall = RpcServer.getCurrentCall().orElse(null); 3670 // now let's do the real scan. 3671 long maxQuotaResultSize = Math.min(maxScannerResultSize, quota.getMaxResultSize()); 3672 RegionScanner scanner = rsh.s; 3673 // this is the limit of rows for this scan, if we the number of rows reach this value, we will 3674 // close the scanner. 3675 int limitOfRows; 3676 if (request.hasLimitOfRows()) { 3677 limitOfRows = request.getLimitOfRows(); 3678 } else { 3679 limitOfRows = -1; 3680 } 3681 boolean scannerClosed = false; 3682 try { 3683 List<Result> results = new ArrayList<>(Math.min(rows, 512)); 3684 ServerSideScanMetrics scanMetrics = trackMetrics ? new ServerSideScanMetrics() : null; 3685 if (rows > 0) { 3686 boolean done = false; 3687 // Call coprocessor. Get region info from scanner. 3688 if (region.getCoprocessorHost() != null) { 3689 Boolean bypass = region.getCoprocessorHost().preScannerNext(scanner, results, rows); 3690 if (!results.isEmpty()) { 3691 for (Result r : results) { 3692 // add cell size from CP results so we can track response size and update limits 3693 // when calling scan below if !done. We'll also have tracked block size if the CP 3694 // got results from hbase, since StoreScanner tracks that for all calls automatically. 3695 addSize(rpcCall, r); 3696 } 3697 } 3698 if (bypass != null && bypass.booleanValue()) { 3699 done = true; 3700 } 3701 } 3702 if (!done) { 3703 scan((HBaseRpcController) controller, request, rsh, maxQuotaResultSize, rows, limitOfRows, 3704 results, builder, rpcCall, scanMetrics); 3705 } else { 3706 builder.setMoreResultsInRegion(!results.isEmpty()); 3707 } 3708 } else { 3709 // This is a open scanner call with numberOfRow = 0, so set more results in region to true. 3710 builder.setMoreResultsInRegion(true); 3711 } 3712 3713 quota.addScanResult(results); 3714 addResults(builder, results, (HBaseRpcController) controller, 3715 RegionReplicaUtil.isDefaultReplica(region.getRegionInfo()), 3716 isClientCellBlockSupport(rpcCall)); 3717 if (scanner.isFilterDone() && results.isEmpty()) { 3718 // If the scanner's filter - if any - is done with the scan 3719 // only set moreResults to false if the results is empty. This is used to keep compatible 3720 // with the old scan implementation where we just ignore the returned results if moreResults 3721 // is false. Can remove the isEmpty check after we get rid of the old implementation. 3722 builder.setMoreResults(false); 3723 } 3724 // Later we may close the scanner depending on this flag so here we need to make sure that we 3725 // have already set this flag. 3726 assert builder.hasMoreResultsInRegion(); 3727 // we only set moreResults to false in the above code, so set it to true if we haven't set it 3728 // yet. 3729 if (!builder.hasMoreResults()) { 3730 builder.setMoreResults(true); 3731 } 3732 if (builder.getMoreResults() && builder.getMoreResultsInRegion() && !results.isEmpty()) { 3733 // Record the last cell of the last result if it is a partial result 3734 // We need this to calculate the complete rows we have returned to client as the 3735 // mayHaveMoreCellsInRow is true does not mean that there will be extra cells for the 3736 // current row. We may filter out all the remaining cells for the current row and just 3737 // return the cells of the nextRow when calling RegionScanner.nextRaw. So here we need to 3738 // check for row change. 3739 Result lastResult = results.get(results.size() - 1); 3740 if (lastResult.mayHaveMoreCellsInRow()) { 3741 rsh.rowOfLastPartialResult = lastResult.getRow(); 3742 } else { 3743 rsh.rowOfLastPartialResult = null; 3744 } 3745 } 3746 if (!builder.getMoreResults() || !builder.getMoreResultsInRegion() || closeScanner) { 3747 scannerClosed = true; 3748 closeScanner(region, scanner, scannerName, rpcCall, false); 3749 } 3750 3751 // There's no point returning to a timed out client. Throwing ensures scanner is closed 3752 if (rpcCall != null && EnvironmentEdgeManager.currentTime() > rpcCall.getDeadline()) { 3753 throw new TimeoutIOException("Client deadline exceeded, cannot return results"); 3754 } 3755 3756 if (scanMetrics != null) { 3757 if (rpcCall != null) { 3758 long rpcScanTime = EnvironmentEdgeManager.currentTime() - rpcCall.getStartTime(); 3759 long rpcQueueWaitTime = rpcCall.getStartTime() - rpcCall.getReceiveTime(); 3760 scanMetrics.addToCounter(ServerSideScanMetrics.RPC_SCAN_PROCESSING_TIME_METRIC_NAME, 3761 rpcScanTime); 3762 scanMetrics.addToCounter(ServerSideScanMetrics.RPC_SCAN_QUEUE_WAIT_TIME_METRIC_NAME, 3763 rpcQueueWaitTime); 3764 } 3765 ThreadLocalServerSideScanMetrics.populateServerSideScanMetrics(scanMetrics); 3766 Map<String, Long> metrics = scanMetrics.getMetricsMap(); 3767 ScanMetrics.Builder metricBuilder = ScanMetrics.newBuilder(); 3768 NameInt64Pair.Builder pairBuilder = NameInt64Pair.newBuilder(); 3769 3770 for (Entry<String, Long> entry : metrics.entrySet()) { 3771 pairBuilder.setName(entry.getKey()); 3772 pairBuilder.setValue(entry.getValue()); 3773 metricBuilder.addMetrics(pairBuilder.build()); 3774 } 3775 3776 builder.setScanMetrics(metricBuilder.build()); 3777 } 3778 3779 return builder.build(); 3780 } catch (IOException e) { 3781 try { 3782 // scanner is closed here 3783 scannerClosed = true; 3784 // The scanner state might be left in a dirty state, so we will tell the Client to 3785 // fail this RPC and close the scanner while opening up another one from the start of 3786 // row that the client has last seen. 3787 closeScanner(region, scanner, scannerName, rpcCall, true); 3788 3789 // If it is a DoNotRetryIOException already, throw as it is. Unfortunately, DNRIOE is 3790 // used in two different semantics. 3791 // (1) The first is to close the client scanner and bubble up the exception all the way 3792 // to the application. This is preferred when the exception is really un-recoverable 3793 // (like CorruptHFileException, etc). Plain DoNotRetryIOException also falls into this 3794 // bucket usually. 3795 // (2) Second semantics is to close the current region scanner only, but continue the 3796 // client scanner by overriding the exception. This is usually UnknownScannerException, 3797 // OutOfOrderScannerNextException, etc where the region scanner has to be closed, but the 3798 // application-level ClientScanner has to continue without bubbling up the exception to 3799 // the client. See ClientScanner code to see how it deals with these special exceptions. 3800 if (e instanceof DoNotRetryIOException) { 3801 throw e; 3802 } 3803 3804 // If it is a FileNotFoundException, wrap as a 3805 // DoNotRetryIOException. This can avoid the retry in ClientScanner. 3806 if (e instanceof FileNotFoundException) { 3807 throw new DoNotRetryIOException(e); 3808 } 3809 3810 // We closed the scanner already. Instead of throwing the IOException, and client 3811 // retrying with the same scannerId only to get USE on the next RPC, we directly throw 3812 // a special exception to save an RPC. 3813 if (VersionInfoUtil.hasMinimumVersion(rpcCall.getClientVersionInfo(), 1, 4)) { 3814 // 1.4.0+ clients know how to handle 3815 throw new ScannerResetException("Scanner is closed on the server-side", e); 3816 } else { 3817 // older clients do not know about SRE. Just throw USE, which they will handle 3818 throw new UnknownScannerException("Throwing UnknownScannerException to reset the client" 3819 + " scanner state for clients older than 1.3.", e); 3820 } 3821 } catch (IOException ioe) { 3822 throw new ServiceException(ioe); 3823 } 3824 } finally { 3825 if (!scannerClosed) { 3826 // Adding resets expiration time on lease. 3827 // the closeCallBack will be set in closeScanner so here we only care about shippedCallback 3828 if (rpcCall != null) { 3829 rpcCall.setCallBack(rsh.shippedCallback); 3830 } else { 3831 // If context is null,here we call rsh.shippedCallback directly to reuse the logic in 3832 // rsh.shippedCallback to release the internal resources in rsh,and lease is also added 3833 // back to regionserver's LeaseManager in rsh.shippedCallback. 3834 runShippedCallback(rsh); 3835 } 3836 } 3837 quota.close(); 3838 } 3839 } 3840 3841 private void runShippedCallback(RegionScannerHolder rsh) throws ServiceException { 3842 assert rsh.shippedCallback != null; 3843 try { 3844 rsh.shippedCallback.run(); 3845 } catch (IOException ioe) { 3846 throw new ServiceException(ioe); 3847 } 3848 } 3849 3850 private void closeScanner(HRegion region, RegionScanner scanner, String scannerName, 3851 RpcCallContext context, boolean isError) throws IOException { 3852 if (region.getCoprocessorHost() != null) { 3853 if (region.getCoprocessorHost().preScannerClose(scanner)) { 3854 // bypass the actual close. 3855 return; 3856 } 3857 } 3858 RegionScannerHolder rsh = scanners.remove(scannerName); 3859 if (rsh != null) { 3860 if (context != null) { 3861 context.setCallBack(rsh.closeCallBack); 3862 } else { 3863 rsh.s.close(); 3864 } 3865 if (region.getCoprocessorHost() != null) { 3866 region.getCoprocessorHost().postScannerClose(scanner); 3867 } 3868 if (!isError) { 3869 closedScanners.put(scannerName, rsh.getNextCallSeq()); 3870 } 3871 } 3872 } 3873 3874 @Override 3875 public CoprocessorServiceResponse execRegionServerService(RpcController controller, 3876 CoprocessorServiceRequest request) throws ServiceException { 3877 rpcPreCheck("execRegionServerService"); 3878 return server.execRegionServerService(controller, request); 3879 } 3880 3881 @Override 3882 public GetSpaceQuotaSnapshotsResponse getSpaceQuotaSnapshots(RpcController controller, 3883 GetSpaceQuotaSnapshotsRequest request) throws ServiceException { 3884 try { 3885 final RegionServerSpaceQuotaManager manager = server.getRegionServerSpaceQuotaManager(); 3886 final GetSpaceQuotaSnapshotsResponse.Builder builder = 3887 GetSpaceQuotaSnapshotsResponse.newBuilder(); 3888 if (manager != null) { 3889 final Map<TableName, SpaceQuotaSnapshot> snapshots = manager.copyQuotaSnapshots(); 3890 for (Entry<TableName, SpaceQuotaSnapshot> snapshot : snapshots.entrySet()) { 3891 builder.addSnapshots(TableQuotaSnapshot.newBuilder() 3892 .setTableName(ProtobufUtil.toProtoTableName(snapshot.getKey())) 3893 .setSnapshot(SpaceQuotaSnapshot.toProtoSnapshot(snapshot.getValue())).build()); 3894 } 3895 } 3896 return builder.build(); 3897 } catch (Exception e) { 3898 throw new ServiceException(e); 3899 } 3900 } 3901 3902 @Override 3903 public ClearRegionBlockCacheResponse clearRegionBlockCache(RpcController controller, 3904 ClearRegionBlockCacheRequest request) throws ServiceException { 3905 try { 3906 rpcPreCheck("clearRegionBlockCache"); 3907 ClearRegionBlockCacheResponse.Builder builder = ClearRegionBlockCacheResponse.newBuilder(); 3908 CacheEvictionStatsBuilder stats = CacheEvictionStats.builder(); 3909 server.getRegionServerCoprocessorHost().preClearRegionBlockCache(); 3910 List<HRegion> regions = getRegions(request.getRegionList(), stats); 3911 for (HRegion region : regions) { 3912 try { 3913 stats = stats.append(this.server.clearRegionBlockCache(region)); 3914 } catch (Exception e) { 3915 stats.addException(region.getRegionInfo().getRegionName(), e); 3916 } 3917 } 3918 stats.withMaxCacheSize(server.getBlockCache().map(BlockCache::getMaxSize).orElse(0L)); 3919 server.getRegionServerCoprocessorHost().postClearRegionBlockCache(stats.build()); 3920 return builder.setStats(ProtobufUtil.toCacheEvictionStats(stats.build())).build(); 3921 } catch (IOException e) { 3922 throw new ServiceException(e); 3923 } 3924 } 3925 3926 private void executeOpenRegionProcedures(OpenRegionRequest request, 3927 Map<TableName, TableDescriptor> tdCache) { 3928 long masterSystemTime = request.hasMasterSystemTime() ? request.getMasterSystemTime() : -1; 3929 long initiatingMasterActiveTime = 3930 request.hasInitiatingMasterActiveTime() ? request.getInitiatingMasterActiveTime() : -1; 3931 for (RegionOpenInfo regionOpenInfo : request.getOpenInfoList()) { 3932 RegionInfo regionInfo = ProtobufUtil.toRegionInfo(regionOpenInfo.getRegion()); 3933 TableName tableName = regionInfo.getTable(); 3934 TableDescriptor tableDesc = tdCache.get(tableName); 3935 if (tableDesc == null) { 3936 try { 3937 tableDesc = server.getTableDescriptors().get(regionInfo.getTable()); 3938 } catch (IOException e) { 3939 // Here we do not fail the whole method since we also need deal with other 3940 // procedures, and we can not ignore this one, so we still schedule a 3941 // AssignRegionHandler and it will report back to master if we still can not get the 3942 // TableDescriptor. 3943 LOG.warn("Failed to get TableDescriptor of {}, will try again in the handler", 3944 regionInfo.getTable(), e); 3945 } 3946 if (tableDesc != null) { 3947 tdCache.put(tableName, tableDesc); 3948 } 3949 } 3950 if (regionOpenInfo.getFavoredNodesCount() > 0) { 3951 server.updateRegionFavoredNodesMapping(regionInfo.getEncodedName(), 3952 regionOpenInfo.getFavoredNodesList()); 3953 } 3954 long procId = regionOpenInfo.getOpenProcId(); 3955 if (server.submitRegionProcedure(procId)) { 3956 server.getExecutorService().submit(AssignRegionHandler.create(server, regionInfo, procId, 3957 tableDesc, masterSystemTime, initiatingMasterActiveTime)); 3958 } 3959 } 3960 } 3961 3962 private void executeCloseRegionProcedures(CloseRegionRequest request) { 3963 String encodedName; 3964 long initiatingMasterActiveTime = 3965 request.hasInitiatingMasterActiveTime() ? request.getInitiatingMasterActiveTime() : -1; 3966 try { 3967 encodedName = ProtobufUtil.getRegionEncodedName(request.getRegion()); 3968 } catch (DoNotRetryIOException e) { 3969 throw new UncheckedIOException("Should not happen", e); 3970 } 3971 ServerName destination = request.hasDestinationServer() 3972 ? ProtobufUtil.toServerName(request.getDestinationServer()) 3973 : null; 3974 long procId = request.getCloseProcId(); 3975 boolean evictCache = request.getEvictCache(); 3976 if (server.submitRegionProcedure(procId)) { 3977 server.getExecutorService().submit(UnassignRegionHandler.create(server, encodedName, procId, 3978 false, destination, evictCache, initiatingMasterActiveTime)); 3979 } 3980 } 3981 3982 private void executeProcedures(RemoteProcedureRequest request) { 3983 RSProcedureCallable callable; 3984 try { 3985 callable = Class.forName(request.getProcClass()).asSubclass(RSProcedureCallable.class) 3986 .getDeclaredConstructor().newInstance(); 3987 } catch (Exception e) { 3988 LOG.warn("Failed to instantiating remote procedure {}, pid={}", request.getProcClass(), 3989 request.getProcId(), e); 3990 server.remoteProcedureComplete(request.getProcId(), request.getInitiatingMasterActiveTime(), 3991 e, null); 3992 return; 3993 } 3994 callable.init(request.getProcData().toByteArray(), server); 3995 LOG.debug("Executing remote procedure {}, pid={}", callable.getClass(), request.getProcId()); 3996 server.executeProcedure(request.getProcId(), request.getInitiatingMasterActiveTime(), callable); 3997 } 3998 3999 @Override 4000 @QosPriority(priority = HConstants.ADMIN_QOS) 4001 public ExecuteProceduresResponse executeProcedures(RpcController controller, 4002 ExecuteProceduresRequest request) throws ServiceException { 4003 try { 4004 checkOpen(); 4005 throwOnWrongStartCode(request); 4006 server.getRegionServerCoprocessorHost().preExecuteProcedures(); 4007 if (request.getOpenRegionCount() > 0) { 4008 // Avoid reading from the TableDescritor every time(usually it will read from the file 4009 // system) 4010 Map<TableName, TableDescriptor> tdCache = new HashMap<>(); 4011 request.getOpenRegionList().forEach(req -> executeOpenRegionProcedures(req, tdCache)); 4012 } 4013 if (request.getCloseRegionCount() > 0) { 4014 request.getCloseRegionList().forEach(this::executeCloseRegionProcedures); 4015 } 4016 if (request.getProcCount() > 0) { 4017 request.getProcList().forEach(this::executeProcedures); 4018 } 4019 server.getRegionServerCoprocessorHost().postExecuteProcedures(); 4020 return ExecuteProceduresResponse.getDefaultInstance(); 4021 } catch (IOException e) { 4022 throw new ServiceException(e); 4023 } 4024 } 4025 4026 @Override 4027 public GetAllBootstrapNodesResponse getAllBootstrapNodes(RpcController controller, 4028 GetAllBootstrapNodesRequest request) throws ServiceException { 4029 GetAllBootstrapNodesResponse.Builder builder = GetAllBootstrapNodesResponse.newBuilder(); 4030 server.getBootstrapNodes() 4031 .forEachRemaining(server -> builder.addNode(ProtobufUtil.toServerName(server))); 4032 return builder.build(); 4033 } 4034 4035 private void setReloadableGuardrails(Configuration conf) { 4036 rowSizeWarnThreshold = 4037 conf.getInt(HConstants.BATCH_ROWS_THRESHOLD_NAME, HConstants.BATCH_ROWS_THRESHOLD_DEFAULT); 4038 rejectRowsWithSizeOverThreshold = 4039 conf.getBoolean(REJECT_BATCH_ROWS_OVER_THRESHOLD, DEFAULT_REJECT_BATCH_ROWS_OVER_THRESHOLD); 4040 maxScannerResultSize = conf.getLong(HConstants.HBASE_SERVER_SCANNER_MAX_RESULT_SIZE_KEY, 4041 HConstants.DEFAULT_HBASE_SERVER_SCANNER_MAX_RESULT_SIZE); 4042 } 4043 4044 @Override 4045 public void onConfigurationChange(Configuration conf) { 4046 super.onConfigurationChange(conf); 4047 setReloadableGuardrails(conf); 4048 } 4049 4050 @Override 4051 public GetCachedFilesListResponse getCachedFilesList(RpcController controller, 4052 GetCachedFilesListRequest request) throws ServiceException { 4053 GetCachedFilesListResponse.Builder responseBuilder = GetCachedFilesListResponse.newBuilder(); 4054 List<String> fullyCachedFiles = new ArrayList<>(); 4055 server.getBlockCache().flatMap(BlockCache::getFullyCachedFiles).ifPresent(fcf -> { 4056 fullyCachedFiles.addAll(fcf.keySet()); 4057 }); 4058 return responseBuilder.addAllCachedFiles(fullyCachedFiles).build(); 4059 } 4060 4061 RegionScannerContext checkQuotaAndGetRegionScannerContext(ScanRequest request, 4062 ScanResponse.Builder builder) throws IOException { 4063 if (request.hasScannerId()) { 4064 // The downstream projects such as AsyncHBase in OpenTSDB need this value. See HBASE-18000 4065 // for more details. 4066 long scannerId = request.getScannerId(); 4067 builder.setScannerId(scannerId); 4068 String scannerName = toScannerName(scannerId); 4069 RegionScannerHolder rsh = getRegionScanner(request); 4070 OperationQuota quota = 4071 getRpcQuotaManager().checkScanQuota(rsh.r, request, maxScannerResultSize, 4072 rsh.getMaxBlockBytesScanned(), rsh.getPrevBlockBytesScannedDifference()); 4073 return new RegionScannerContext(scannerName, rsh, quota); 4074 } 4075 4076 HRegion region = getRegion(request.getRegion()); 4077 OperationQuota quota = 4078 getRpcQuotaManager().checkScanQuota(region, request, maxScannerResultSize, 0L, 0L); 4079 Pair<String, RegionScannerHolder> pair = newRegionScanner(request, region, builder); 4080 return new RegionScannerContext(pair.getFirst(), pair.getSecond(), quota); 4081 } 4082}