001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019 020import com.google.errorprone.annotations.RestrictedApi; 021import java.io.FileNotFoundException; 022import java.io.IOException; 023import java.io.UncheckedIOException; 024import java.net.BindException; 025import java.net.InetAddress; 026import java.net.InetSocketAddress; 027import java.util.ArrayList; 028import java.util.Arrays; 029import java.util.Collections; 030import java.util.HashMap; 031import java.util.Iterator; 032import java.util.List; 033import java.util.Map; 034import java.util.Map.Entry; 035import java.util.NavigableMap; 036import java.util.Set; 037import java.util.TreeSet; 038import java.util.concurrent.ConcurrentHashMap; 039import java.util.concurrent.ConcurrentMap; 040import java.util.concurrent.TimeUnit; 041import java.util.concurrent.atomic.AtomicBoolean; 042import java.util.concurrent.atomic.AtomicLong; 043import java.util.concurrent.atomic.LongAdder; 044import org.apache.hadoop.conf.Configuration; 045import org.apache.hadoop.fs.FileSystem; 046import org.apache.hadoop.fs.Path; 047import org.apache.hadoop.hbase.CacheEvictionStats; 048import org.apache.hadoop.hbase.CacheEvictionStatsBuilder; 049import org.apache.hadoop.hbase.Cell; 050import org.apache.hadoop.hbase.CellScanner; 051import org.apache.hadoop.hbase.CellUtil; 052import org.apache.hadoop.hbase.DoNotRetryIOException; 053import org.apache.hadoop.hbase.DroppedSnapshotException; 054import org.apache.hadoop.hbase.ExtendedCellScannable; 055import org.apache.hadoop.hbase.ExtendedCellScanner; 056import org.apache.hadoop.hbase.HBaseIOException; 057import org.apache.hadoop.hbase.HBaseRpcServicesBase; 058import org.apache.hadoop.hbase.HConstants; 059import org.apache.hadoop.hbase.MultiActionResultTooLarge; 060import org.apache.hadoop.hbase.NotServingRegionException; 061import org.apache.hadoop.hbase.PrivateCellUtil; 062import org.apache.hadoop.hbase.RegionTooBusyException; 063import org.apache.hadoop.hbase.Server; 064import org.apache.hadoop.hbase.ServerName; 065import org.apache.hadoop.hbase.TableName; 066import org.apache.hadoop.hbase.UnknownScannerException; 067import org.apache.hadoop.hbase.client.Append; 068import org.apache.hadoop.hbase.client.CheckAndMutate; 069import org.apache.hadoop.hbase.client.CheckAndMutateResult; 070import org.apache.hadoop.hbase.client.ClientInternalHelper; 071import org.apache.hadoop.hbase.client.Delete; 072import org.apache.hadoop.hbase.client.Durability; 073import org.apache.hadoop.hbase.client.Get; 074import org.apache.hadoop.hbase.client.Increment; 075import org.apache.hadoop.hbase.client.Mutation; 076import org.apache.hadoop.hbase.client.OperationWithAttributes; 077import org.apache.hadoop.hbase.client.Put; 078import org.apache.hadoop.hbase.client.QueryMetrics; 079import org.apache.hadoop.hbase.client.RegionInfo; 080import org.apache.hadoop.hbase.client.RegionReplicaUtil; 081import org.apache.hadoop.hbase.client.Result; 082import org.apache.hadoop.hbase.client.Row; 083import org.apache.hadoop.hbase.client.Scan; 084import org.apache.hadoop.hbase.client.TableDescriptor; 085import org.apache.hadoop.hbase.client.VersionInfoUtil; 086import org.apache.hadoop.hbase.client.metrics.ServerSideScanMetrics; 087import org.apache.hadoop.hbase.exceptions.FailedSanityCheckException; 088import org.apache.hadoop.hbase.exceptions.OutOfOrderScannerNextException; 089import org.apache.hadoop.hbase.exceptions.ScannerResetException; 090import org.apache.hadoop.hbase.exceptions.TimeoutIOException; 091import org.apache.hadoop.hbase.exceptions.UnknownProtocolException; 092import org.apache.hadoop.hbase.io.ByteBuffAllocator; 093import org.apache.hadoop.hbase.io.hfile.BlockCache; 094import org.apache.hadoop.hbase.ipc.HBaseRpcController; 095import org.apache.hadoop.hbase.ipc.PriorityFunction; 096import org.apache.hadoop.hbase.ipc.QosPriority; 097import org.apache.hadoop.hbase.ipc.RpcCall; 098import org.apache.hadoop.hbase.ipc.RpcCallContext; 099import org.apache.hadoop.hbase.ipc.RpcCallback; 100import org.apache.hadoop.hbase.ipc.RpcServer; 101import org.apache.hadoop.hbase.ipc.RpcServer.BlockingServiceAndInterface; 102import org.apache.hadoop.hbase.ipc.RpcServerFactory; 103import org.apache.hadoop.hbase.ipc.RpcServerInterface; 104import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException; 105import org.apache.hadoop.hbase.ipc.ServerRpcController; 106import org.apache.hadoop.hbase.net.Address; 107import org.apache.hadoop.hbase.procedure2.RSProcedureCallable; 108import org.apache.hadoop.hbase.quotas.ActivePolicyEnforcement; 109import org.apache.hadoop.hbase.quotas.OperationQuota; 110import org.apache.hadoop.hbase.quotas.QuotaUtil; 111import org.apache.hadoop.hbase.quotas.RegionServerRpcQuotaManager; 112import org.apache.hadoop.hbase.quotas.RegionServerSpaceQuotaManager; 113import org.apache.hadoop.hbase.quotas.SpaceQuotaSnapshot; 114import org.apache.hadoop.hbase.quotas.SpaceViolationPolicyEnforcement; 115import org.apache.hadoop.hbase.regionserver.LeaseManager.Lease; 116import org.apache.hadoop.hbase.regionserver.LeaseManager.LeaseStillHeldException; 117import org.apache.hadoop.hbase.regionserver.Region.Operation; 118import org.apache.hadoop.hbase.regionserver.ScannerContext.LimitScope; 119import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker; 120import org.apache.hadoop.hbase.regionserver.handler.AssignRegionHandler; 121import org.apache.hadoop.hbase.regionserver.handler.OpenMetaHandler; 122import org.apache.hadoop.hbase.regionserver.handler.OpenPriorityRegionHandler; 123import org.apache.hadoop.hbase.regionserver.handler.OpenRegionHandler; 124import org.apache.hadoop.hbase.regionserver.handler.UnassignRegionHandler; 125import org.apache.hadoop.hbase.replication.ReplicationUtils; 126import org.apache.hadoop.hbase.replication.regionserver.RejectReplicationRequestStateChecker; 127import org.apache.hadoop.hbase.replication.regionserver.RejectRequestsFromClientStateChecker; 128import org.apache.hadoop.hbase.security.Superusers; 129import org.apache.hadoop.hbase.security.access.Permission; 130import org.apache.hadoop.hbase.util.Bytes; 131import org.apache.hadoop.hbase.util.DNS; 132import org.apache.hadoop.hbase.util.DNS.ServerType; 133import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 134import org.apache.hadoop.hbase.util.Pair; 135import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil; 136import org.apache.hadoop.hbase.wal.WAL; 137import org.apache.hadoop.hbase.wal.WALEdit; 138import org.apache.hadoop.hbase.wal.WALKey; 139import org.apache.hadoop.hbase.wal.WALSplitUtil; 140import org.apache.hadoop.hbase.wal.WALSplitUtil.MutationReplay; 141import org.apache.hadoop.hbase.zookeeper.ZKWatcher; 142import org.apache.yetus.audience.InterfaceAudience; 143import org.slf4j.Logger; 144import org.slf4j.LoggerFactory; 145 146import org.apache.hbase.thirdparty.com.google.common.cache.Cache; 147import org.apache.hbase.thirdparty.com.google.common.cache.CacheBuilder; 148import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList; 149import org.apache.hbase.thirdparty.com.google.common.collect.Lists; 150import org.apache.hbase.thirdparty.com.google.protobuf.ByteString; 151import org.apache.hbase.thirdparty.com.google.protobuf.Message; 152import org.apache.hbase.thirdparty.com.google.protobuf.RpcController; 153import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException; 154import org.apache.hbase.thirdparty.com.google.protobuf.TextFormat; 155import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations; 156import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils; 157 158import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 159import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter; 160import org.apache.hadoop.hbase.shaded.protobuf.ResponseConverter; 161import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService; 162import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearCompactionQueuesRequest; 163import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearCompactionQueuesResponse; 164import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearRegionBlockCacheRequest; 165import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearRegionBlockCacheResponse; 166import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CloseRegionRequest; 167import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CloseRegionResponse; 168import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactRegionRequest; 169import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactRegionResponse; 170import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactionSwitchRequest; 171import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactionSwitchResponse; 172import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ExecuteProceduresRequest; 173import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ExecuteProceduresResponse; 174import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionRequest; 175import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionResponse; 176import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetCachedFilesListRequest; 177import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetCachedFilesListResponse; 178import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetOnlineRegionRequest; 179import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetOnlineRegionResponse; 180import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoRequest; 181import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoResponse; 182import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionLoadRequest; 183import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionLoadResponse; 184import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetServerInfoRequest; 185import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetServerInfoResponse; 186import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetStoreFileRequest; 187import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetStoreFileResponse; 188import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionRequest; 189import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionRequest.RegionOpenInfo; 190import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionResponse; 191import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionResponse.RegionOpeningState; 192import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.RemoteProcedureRequest; 193import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ReplicateWALEntryRequest; 194import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ReplicateWALEntryResponse; 195import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.RollWALWriterRequest; 196import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.RollWALWriterResponse; 197import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.StopServerRequest; 198import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.StopServerResponse; 199import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateFavoredNodesRequest; 200import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateFavoredNodesResponse; 201import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WALEntry; 202import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WarmupRegionRequest; 203import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WarmupRegionResponse; 204import org.apache.hadoop.hbase.shaded.protobuf.generated.BootstrapNodeProtos.BootstrapNodeService; 205import org.apache.hadoop.hbase.shaded.protobuf.generated.BootstrapNodeProtos.GetAllBootstrapNodesRequest; 206import org.apache.hadoop.hbase.shaded.protobuf.generated.BootstrapNodeProtos.GetAllBootstrapNodesResponse; 207import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos; 208import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.Action; 209import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.BulkLoadHFileRequest; 210import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.BulkLoadHFileRequest.FamilyPath; 211import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.BulkLoadHFileResponse; 212import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CleanupBulkLoadRequest; 213import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CleanupBulkLoadResponse; 214import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService; 215import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.Condition; 216import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceRequest; 217import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceResponse; 218import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.GetRequest; 219import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.GetResponse; 220import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MultiRegionLoadStats; 221import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MultiRequest; 222import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MultiResponse; 223import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateRequest; 224import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateResponse; 225import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutationProto; 226import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutationProto.MutationType; 227import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.PrepareBulkLoadRequest; 228import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.PrepareBulkLoadResponse; 229import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.RegionAction; 230import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.RegionActionResult; 231import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ResultOrException; 232import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanRequest; 233import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanResponse; 234import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos; 235import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos.RegionLoad; 236import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.NameBytesPair; 237import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.NameInt64Pair; 238import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier; 239import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType; 240import org.apache.hadoop.hbase.shaded.protobuf.generated.MapReduceProtos.ScanMetrics; 241import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsRequest; 242import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsResponse; 243import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsResponse.TableQuotaSnapshot; 244import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.ClientMetaService; 245import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.BulkLoadDescriptor; 246import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.CompactionDescriptor; 247import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.FlushDescriptor; 248import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.RegionEventDescriptor; 249 250/** 251 * Implements the regionserver RPC services. 252 */ 253@InterfaceAudience.Private 254public class RSRpcServices extends HBaseRpcServicesBase<HRegionServer> 255 implements ClientService.BlockingInterface, BootstrapNodeService.BlockingInterface { 256 257 private static final Logger LOG = LoggerFactory.getLogger(RSRpcServices.class); 258 259 /** RPC scheduler to use for the region server. */ 260 public static final String REGION_SERVER_RPC_SCHEDULER_FACTORY_CLASS = 261 "hbase.region.server.rpc.scheduler.factory.class"; 262 263 /** 264 * Minimum allowable time limit delta (in milliseconds) that can be enforced during scans. This 265 * configuration exists to prevent the scenario where a time limit is specified to be so 266 * restrictive that the time limit is reached immediately (before any cells are scanned). 267 */ 268 private static final String REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA = 269 "hbase.region.server.rpc.minimum.scan.time.limit.delta"; 270 /** 271 * Default value of {@link RSRpcServices#REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA} 272 */ 273 static final long DEFAULT_REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA = 10; 274 275 /** 276 * Whether to reject rows with size > threshold defined by 277 * {@link HConstants#BATCH_ROWS_THRESHOLD_NAME} 278 */ 279 private static final String REJECT_BATCH_ROWS_OVER_THRESHOLD = 280 "hbase.rpc.rows.size.threshold.reject"; 281 282 /** 283 * Default value of config {@link RSRpcServices#REJECT_BATCH_ROWS_OVER_THRESHOLD} 284 */ 285 private static final boolean DEFAULT_REJECT_BATCH_ROWS_OVER_THRESHOLD = false; 286 287 // Request counter. (Includes requests that are not serviced by regions.) 288 // Count only once for requests with multiple actions like multi/caching-scan/replayBatch 289 final LongAdder requestCount = new LongAdder(); 290 291 // Request counter for rpc get 292 final LongAdder rpcGetRequestCount = new LongAdder(); 293 294 // Request counter for rpc scan 295 final LongAdder rpcScanRequestCount = new LongAdder(); 296 297 // Request counter for scans that might end up in full scans 298 final LongAdder rpcFullScanRequestCount = new LongAdder(); 299 300 // Request counter for rpc multi 301 final LongAdder rpcMultiRequestCount = new LongAdder(); 302 303 // Request counter for rpc mutate 304 final LongAdder rpcMutateRequestCount = new LongAdder(); 305 306 private volatile long maxScannerResultSize; 307 308 private ScannerIdGenerator scannerIdGenerator; 309 private final ConcurrentMap<String, RegionScannerHolder> scanners = new ConcurrentHashMap<>(); 310 // Hold the name and last sequence number of a closed scanner for a while. This is used 311 // to keep compatible for old clients which may send next or close request to a region 312 // scanner which has already been exhausted. The entries will be removed automatically 313 // after scannerLeaseTimeoutPeriod. 314 private final Cache<String, Long> closedScanners; 315 /** 316 * The lease timeout period for client scanners (milliseconds). 317 */ 318 private final int scannerLeaseTimeoutPeriod; 319 320 /** 321 * The RPC timeout period (milliseconds) 322 */ 323 private final int rpcTimeout; 324 325 /** 326 * The minimum allowable delta to use for the scan limit 327 */ 328 private final long minimumScanTimeLimitDelta; 329 330 /** 331 * Row size threshold for multi requests above which a warning is logged 332 */ 333 private volatile int rowSizeWarnThreshold; 334 /* 335 * Whether we should reject requests with very high no of rows i.e. beyond threshold defined by 336 * rowSizeWarnThreshold 337 */ 338 private volatile boolean rejectRowsWithSizeOverThreshold; 339 340 final AtomicBoolean clearCompactionQueues = new AtomicBoolean(false); 341 342 /** 343 * Services launched in RSRpcServices. By default they are on but you can use the below booleans 344 * to selectively enable/disable these services (Rare is the case where you would ever turn off 345 * one or the other). 346 */ 347 public static final String REGIONSERVER_ADMIN_SERVICE_CONFIG = 348 "hbase.regionserver.admin.executorService"; 349 public static final String REGIONSERVER_CLIENT_SERVICE_CONFIG = 350 "hbase.regionserver.client.executorService"; 351 public static final String REGIONSERVER_CLIENT_META_SERVICE_CONFIG = 352 "hbase.regionserver.client.meta.executorService"; 353 public static final String REGIONSERVER_BOOTSTRAP_NODES_SERVICE_CONFIG = 354 "hbase.regionserver.bootstrap.nodes.executorService"; 355 356 /** 357 * An Rpc callback for closing a RegionScanner. 358 */ 359 private static final class RegionScannerCloseCallBack implements RpcCallback { 360 361 private final RegionScanner scanner; 362 363 public RegionScannerCloseCallBack(RegionScanner scanner) { 364 this.scanner = scanner; 365 } 366 367 @Override 368 public void run() throws IOException { 369 this.scanner.close(); 370 } 371 } 372 373 /** 374 * An Rpc callback for doing shipped() call on a RegionScanner. 375 */ 376 private class RegionScannerShippedCallBack implements RpcCallback { 377 private final String scannerName; 378 private final Shipper shipper; 379 private final Lease lease; 380 381 public RegionScannerShippedCallBack(String scannerName, Shipper shipper, Lease lease) { 382 this.scannerName = scannerName; 383 this.shipper = shipper; 384 this.lease = lease; 385 } 386 387 @Override 388 public void run() throws IOException { 389 this.shipper.shipped(); 390 // We're done. On way out re-add the above removed lease. The lease was temp removed for this 391 // Rpc call and we are at end of the call now. Time to add it back. 392 if (scanners.containsKey(scannerName)) { 393 if (lease != null) { 394 server.getLeaseManager().addLease(lease); 395 } 396 } 397 } 398 } 399 400 /** 401 * An RpcCallBack that creates a list of scanners that needs to perform callBack operation on 402 * completion of multiGets. 403 */ 404 static class RegionScannersCloseCallBack implements RpcCallback { 405 private final List<RegionScanner> scanners = new ArrayList<>(); 406 407 public void addScanner(RegionScanner scanner) { 408 this.scanners.add(scanner); 409 } 410 411 @Override 412 public void run() { 413 for (RegionScanner scanner : scanners) { 414 try { 415 scanner.close(); 416 } catch (IOException e) { 417 LOG.error("Exception while closing the scanner " + scanner, e); 418 } 419 } 420 } 421 } 422 423 static class RegionScannerContext { 424 final String scannerName; 425 final RegionScannerHolder holder; 426 final OperationQuota quota; 427 428 RegionScannerContext(String scannerName, RegionScannerHolder holder, OperationQuota quota) { 429 this.scannerName = scannerName; 430 this.holder = holder; 431 this.quota = quota; 432 } 433 } 434 435 /** 436 * Holder class which holds the RegionScanner, nextCallSeq and RpcCallbacks together. 437 */ 438 static final class RegionScannerHolder { 439 private final AtomicLong nextCallSeq = new AtomicLong(0); 440 private final RegionScanner s; 441 private final HRegion r; 442 private final RpcCallback closeCallBack; 443 private final RpcCallback shippedCallback; 444 private byte[] rowOfLastPartialResult; 445 private boolean needCursor; 446 private boolean fullRegionScan; 447 private final String clientIPAndPort; 448 private final String userName; 449 private volatile long maxBlockBytesScanned = 0; 450 private volatile long prevBlockBytesScanned = 0; 451 private volatile long prevBlockBytesScannedDifference = 0; 452 453 RegionScannerHolder(RegionScanner s, HRegion r, RpcCallback closeCallBack, 454 RpcCallback shippedCallback, boolean needCursor, boolean fullRegionScan, 455 String clientIPAndPort, String userName) { 456 this.s = s; 457 this.r = r; 458 this.closeCallBack = closeCallBack; 459 this.shippedCallback = shippedCallback; 460 this.needCursor = needCursor; 461 this.fullRegionScan = fullRegionScan; 462 this.clientIPAndPort = clientIPAndPort; 463 this.userName = userName; 464 } 465 466 long getNextCallSeq() { 467 return nextCallSeq.get(); 468 } 469 470 boolean incNextCallSeq(long currentSeq) { 471 // Use CAS to prevent multiple scan request running on the same scanner. 472 return nextCallSeq.compareAndSet(currentSeq, currentSeq + 1); 473 } 474 475 long getMaxBlockBytesScanned() { 476 return maxBlockBytesScanned; 477 } 478 479 long getPrevBlockBytesScannedDifference() { 480 return prevBlockBytesScannedDifference; 481 } 482 483 void updateBlockBytesScanned(long blockBytesScanned) { 484 prevBlockBytesScannedDifference = blockBytesScanned - prevBlockBytesScanned; 485 prevBlockBytesScanned = blockBytesScanned; 486 if (blockBytesScanned > maxBlockBytesScanned) { 487 maxBlockBytesScanned = blockBytesScanned; 488 } 489 } 490 491 // Should be called only when we need to print lease expired messages otherwise 492 // cache the String once made. 493 @Override 494 public String toString() { 495 return "clientIPAndPort=" + this.clientIPAndPort + ", userName=" + this.userName 496 + ", regionInfo=" + this.r.getRegionInfo().getRegionNameAsString(); 497 } 498 } 499 500 /** 501 * Instantiated as a scanner lease. If the lease times out, the scanner is closed 502 */ 503 private class ScannerListener implements LeaseListener { 504 private final String scannerName; 505 506 ScannerListener(final String n) { 507 this.scannerName = n; 508 } 509 510 @Override 511 public void leaseExpired() { 512 RegionScannerHolder rsh = scanners.remove(this.scannerName); 513 if (rsh == null) { 514 LOG.warn("Scanner lease {} expired but no outstanding scanner", this.scannerName); 515 return; 516 } 517 LOG.info("Scanner lease {} expired {}", this.scannerName, rsh); 518 server.getMetrics().incrScannerLeaseExpired(); 519 RegionScanner s = rsh.s; 520 HRegion region = null; 521 try { 522 region = server.getRegion(s.getRegionInfo().getRegionName()); 523 if (region != null && region.getCoprocessorHost() != null) { 524 region.getCoprocessorHost().preScannerClose(s); 525 } 526 } catch (IOException e) { 527 LOG.error("Closing scanner {} {}", this.scannerName, rsh, e); 528 } finally { 529 try { 530 s.close(); 531 if (region != null && region.getCoprocessorHost() != null) { 532 region.getCoprocessorHost().postScannerClose(s); 533 } 534 } catch (IOException e) { 535 LOG.error("Closing scanner {} {}", this.scannerName, rsh, e); 536 } 537 } 538 } 539 } 540 541 private static ResultOrException getResultOrException(final ClientProtos.Result r, 542 final int index) { 543 return getResultOrException(ResponseConverter.buildActionResult(r), index); 544 } 545 546 private static ResultOrException getResultOrException(final Exception e, final int index) { 547 return getResultOrException(ResponseConverter.buildActionResult(e), index); 548 } 549 550 private static ResultOrException getResultOrException(final ResultOrException.Builder builder, 551 final int index) { 552 return builder.setIndex(index).build(); 553 } 554 555 /** 556 * Checks for the following pre-checks in order: 557 * <ol> 558 * <li>RegionServer is running</li> 559 * <li>If authorization is enabled, then RPC caller has ADMIN permissions</li> 560 * </ol> 561 * @param requestName name of rpc request. Used in reporting failures to provide context. 562 * @throws ServiceException If any of the above listed pre-check fails. 563 */ 564 private void rpcPreCheck(String requestName) throws ServiceException { 565 try { 566 checkOpen(); 567 requirePermission(requestName, Permission.Action.ADMIN); 568 } catch (IOException ioe) { 569 throw new ServiceException(ioe); 570 } 571 } 572 573 private boolean isClientCellBlockSupport(RpcCallContext context) { 574 return context != null && context.isClientCellBlockSupported(); 575 } 576 577 private void addResult(final MutateResponse.Builder builder, final Result result, 578 final HBaseRpcController rpcc, boolean clientCellBlockSupported) { 579 if (result == null) return; 580 if (clientCellBlockSupported) { 581 builder.setResult(ProtobufUtil.toResultNoData(result)); 582 rpcc.setCellScanner(result.cellScanner()); 583 } else { 584 ClientProtos.Result pbr = ProtobufUtil.toResult(result); 585 builder.setResult(pbr); 586 } 587 } 588 589 private void addResults(ScanResponse.Builder builder, List<Result> results, 590 HBaseRpcController controller, boolean isDefaultRegion, boolean clientCellBlockSupported) { 591 builder.setStale(!isDefaultRegion); 592 if (results.isEmpty()) { 593 return; 594 } 595 if (clientCellBlockSupported) { 596 for (Result res : results) { 597 builder.addCellsPerResult(res.size()); 598 builder.addPartialFlagPerResult(res.mayHaveMoreCellsInRow()); 599 } 600 controller.setCellScanner(PrivateCellUtil.createExtendedCellScanner(results)); 601 } else { 602 for (Result res : results) { 603 ClientProtos.Result pbr = ProtobufUtil.toResult(res); 604 builder.addResults(pbr); 605 } 606 } 607 } 608 609 private CheckAndMutateResult checkAndMutate(HRegion region, List<ClientProtos.Action> actions, 610 CellScanner cellScanner, Condition condition, long nonceGroup, 611 ActivePolicyEnforcement spaceQuotaEnforcement) throws IOException { 612 int countOfCompleteMutation = 0; 613 try { 614 if (!region.getRegionInfo().isMetaRegion()) { 615 server.getMemStoreFlusher().reclaimMemStoreMemory(); 616 } 617 List<Mutation> mutations = new ArrayList<>(); 618 long nonce = HConstants.NO_NONCE; 619 for (ClientProtos.Action action : actions) { 620 if (action.hasGet()) { 621 throw new DoNotRetryIOException( 622 "Atomic put and/or delete only, not a Get=" + action.getGet()); 623 } 624 MutationProto mutation = action.getMutation(); 625 MutationType type = mutation.getMutateType(); 626 switch (type) { 627 case PUT: 628 Put put = ProtobufUtil.toPut(mutation, cellScanner); 629 ++countOfCompleteMutation; 630 checkCellSizeLimit(region, put); 631 spaceQuotaEnforcement.getPolicyEnforcement(region).check(put); 632 mutations.add(put); 633 break; 634 case DELETE: 635 Delete del = ProtobufUtil.toDelete(mutation, cellScanner); 636 ++countOfCompleteMutation; 637 spaceQuotaEnforcement.getPolicyEnforcement(region).check(del); 638 mutations.add(del); 639 break; 640 case INCREMENT: 641 Increment increment = ProtobufUtil.toIncrement(mutation, cellScanner); 642 nonce = mutation.hasNonce() ? mutation.getNonce() : HConstants.NO_NONCE; 643 ++countOfCompleteMutation; 644 checkCellSizeLimit(region, increment); 645 spaceQuotaEnforcement.getPolicyEnforcement(region).check(increment); 646 mutations.add(increment); 647 break; 648 case APPEND: 649 Append append = ProtobufUtil.toAppend(mutation, cellScanner); 650 nonce = mutation.hasNonce() ? mutation.getNonce() : HConstants.NO_NONCE; 651 ++countOfCompleteMutation; 652 checkCellSizeLimit(region, append); 653 spaceQuotaEnforcement.getPolicyEnforcement(region).check(append); 654 mutations.add(append); 655 break; 656 default: 657 throw new DoNotRetryIOException("invalid mutation type : " + type); 658 } 659 } 660 661 if (mutations.size() == 0) { 662 return new CheckAndMutateResult(true, null); 663 } else { 664 CheckAndMutate checkAndMutate = ProtobufUtil.toCheckAndMutate(condition, mutations); 665 CheckAndMutateResult result = null; 666 if (region.getCoprocessorHost() != null) { 667 result = region.getCoprocessorHost().preCheckAndMutate(checkAndMutate); 668 } 669 if (result == null) { 670 result = region.checkAndMutate(checkAndMutate, nonceGroup, nonce); 671 if (region.getCoprocessorHost() != null) { 672 result = region.getCoprocessorHost().postCheckAndMutate(checkAndMutate, result); 673 } 674 } 675 return result; 676 } 677 } finally { 678 // Currently, the checkAndMutate isn't supported by batch so it won't mess up the cell scanner 679 // even if the malformed cells are not skipped. 680 for (int i = countOfCompleteMutation; i < actions.size(); ++i) { 681 skipCellsForMutation(actions.get(i), cellScanner); 682 } 683 } 684 } 685 686 /** 687 * Execute an append mutation. 688 * @return result to return to client if default operation should be bypassed as indicated by 689 * RegionObserver, null otherwise 690 */ 691 private Result append(final HRegion region, final OperationQuota quota, 692 final MutationProto mutation, final CellScanner cellScanner, long nonceGroup, 693 ActivePolicyEnforcement spaceQuota, RpcCallContext context) throws IOException { 694 long before = EnvironmentEdgeManager.currentTime(); 695 Append append = ProtobufUtil.toAppend(mutation, cellScanner); 696 checkCellSizeLimit(region, append); 697 spaceQuota.getPolicyEnforcement(region).check(append); 698 quota.addMutation(append); 699 long blockBytesScannedBefore = context != null ? context.getBlockBytesScanned() : 0; 700 long nonce = mutation.hasNonce() ? mutation.getNonce() : HConstants.NO_NONCE; 701 Result r = region.append(append, nonceGroup, nonce); 702 if (server.getMetrics() != null) { 703 long blockBytesScanned = 704 context != null ? context.getBlockBytesScanned() - blockBytesScannedBefore : 0; 705 server.getMetrics().updateAppend(region, EnvironmentEdgeManager.currentTime() - before, 706 blockBytesScanned); 707 } 708 return r == null ? Result.EMPTY_RESULT : r; 709 } 710 711 /** 712 * Execute an increment mutation. 713 */ 714 private Result increment(final HRegion region, final OperationQuota quota, 715 final MutationProto mutation, final CellScanner cells, long nonceGroup, 716 ActivePolicyEnforcement spaceQuota, RpcCallContext context) throws IOException { 717 long before = EnvironmentEdgeManager.currentTime(); 718 Increment increment = ProtobufUtil.toIncrement(mutation, cells); 719 checkCellSizeLimit(region, increment); 720 spaceQuota.getPolicyEnforcement(region).check(increment); 721 quota.addMutation(increment); 722 long blockBytesScannedBefore = context != null ? context.getBlockBytesScanned() : 0; 723 long nonce = mutation.hasNonce() ? mutation.getNonce() : HConstants.NO_NONCE; 724 Result r = region.increment(increment, nonceGroup, nonce); 725 final MetricsRegionServer metricsRegionServer = server.getMetrics(); 726 if (metricsRegionServer != null) { 727 long blockBytesScanned = 728 context != null ? context.getBlockBytesScanned() - blockBytesScannedBefore : 0; 729 metricsRegionServer.updateIncrement(region, EnvironmentEdgeManager.currentTime() - before, 730 blockBytesScanned); 731 } 732 return r == null ? Result.EMPTY_RESULT : r; 733 } 734 735 /** 736 * Run through the regionMutation <code>rm</code> and per Mutation, do the work, and then when 737 * done, add an instance of a {@link ResultOrException} that corresponds to each Mutation. 738 * @param cellsToReturn Could be null. May be allocated in this method. This is what this method 739 * returns as a 'result'. 740 * @param closeCallBack the callback to be used with multigets 741 * @param context the current RpcCallContext 742 * @return Return the <code>cellScanner</code> passed 743 */ 744 private List<ExtendedCellScannable> doNonAtomicRegionMutation(final HRegion region, 745 final OperationQuota quota, final RegionAction actions, final CellScanner cellScanner, 746 final RegionActionResult.Builder builder, List<ExtendedCellScannable> cellsToReturn, 747 long nonceGroup, final RegionScannersCloseCallBack closeCallBack, RpcCallContext context, 748 ActivePolicyEnforcement spaceQuotaEnforcement) { 749 // Gather up CONTIGUOUS Puts and Deletes in this mutations List. Idea is that rather than do 750 // one at a time, we instead pass them in batch. Be aware that the corresponding 751 // ResultOrException instance that matches each Put or Delete is then added down in the 752 // doNonAtomicBatchOp call. We should be staying aligned though the Put and Delete are 753 // deferred/batched 754 List<ClientProtos.Action> mutations = null; 755 long maxQuotaResultSize = Math.min(maxScannerResultSize, quota.getMaxResultSize()); 756 IOException sizeIOE = null; 757 ClientProtos.ResultOrException.Builder resultOrExceptionBuilder = 758 ResultOrException.newBuilder(); 759 boolean hasResultOrException = false; 760 for (ClientProtos.Action action : actions.getActionList()) { 761 hasResultOrException = false; 762 resultOrExceptionBuilder.clear(); 763 try { 764 Result r = null; 765 long blockBytesScannedBefore = context != null ? context.getBlockBytesScanned() : 0; 766 if ( 767 context != null && context.isRetryImmediatelySupported() 768 && (context.getResponseCellSize() > maxQuotaResultSize 769 || blockBytesScannedBefore + context.getResponseExceptionSize() > maxQuotaResultSize) 770 ) { 771 772 // We're storing the exception since the exception and reason string won't 773 // change after the response size limit is reached. 774 if (sizeIOE == null) { 775 // We don't need the stack un-winding do don't throw the exception. 776 // Throwing will kill the JVM's JIT. 777 // 778 // Instead just create the exception and then store it. 779 sizeIOE = new MultiActionResultTooLarge("Max size exceeded" + " CellSize: " 780 + context.getResponseCellSize() + " BlockSize: " + blockBytesScannedBefore); 781 782 // Only report the exception once since there's only one request that 783 // caused the exception. Otherwise this number will dominate the exceptions count. 784 rpcServer.getMetrics().exception(sizeIOE); 785 } 786 787 // Now that there's an exception is known to be created 788 // use it for the response. 789 // 790 // This will create a copy in the builder. 791 NameBytesPair pair = ResponseConverter.buildException(sizeIOE); 792 resultOrExceptionBuilder.setException(pair); 793 context.incrementResponseExceptionSize(pair.getSerializedSize()); 794 resultOrExceptionBuilder.setIndex(action.getIndex()); 795 builder.addResultOrException(resultOrExceptionBuilder.build()); 796 skipCellsForMutation(action, cellScanner); 797 continue; 798 } 799 if (action.hasGet()) { 800 long before = EnvironmentEdgeManager.currentTime(); 801 ClientProtos.Get pbGet = action.getGet(); 802 // An asynchbase client, https://github.com/OpenTSDB/asynchbase, starts by trying to do 803 // a get closest before. Throwing the UnknownProtocolException signals it that it needs 804 // to switch and do hbase2 protocol (HBase servers do not tell clients what versions 805 // they are; its a problem for non-native clients like asynchbase. HBASE-20225. 806 if (pbGet.hasClosestRowBefore() && pbGet.getClosestRowBefore()) { 807 throw new UnknownProtocolException("Is this a pre-hbase-1.0.0 or asynchbase client? " 808 + "Client is invoking getClosestRowBefore removed in hbase-2.0.0 replaced by " 809 + "reverse Scan."); 810 } 811 try { 812 Get get = ProtobufUtil.toGet(pbGet); 813 if (context != null) { 814 r = get(get, (region), closeCallBack, context); 815 } else { 816 r = region.get(get); 817 } 818 } finally { 819 final MetricsRegionServer metricsRegionServer = server.getMetrics(); 820 if (metricsRegionServer != null) { 821 long blockBytesScanned = 822 context != null ? context.getBlockBytesScanned() - blockBytesScannedBefore : 0; 823 metricsRegionServer.updateGet(region, EnvironmentEdgeManager.currentTime() - before, 824 blockBytesScanned); 825 } 826 } 827 } else if (action.hasServiceCall()) { 828 hasResultOrException = true; 829 Message result = execServiceOnRegion(region, action.getServiceCall()); 830 ClientProtos.CoprocessorServiceResult.Builder serviceResultBuilder = 831 ClientProtos.CoprocessorServiceResult.newBuilder(); 832 resultOrExceptionBuilder.setServiceResult(serviceResultBuilder 833 .setValue(serviceResultBuilder.getValueBuilder().setName(result.getClass().getName()) 834 // TODO: Copy!!! 835 .setValue(UnsafeByteOperations.unsafeWrap(result.toByteArray())))); 836 } else if (action.hasMutation()) { 837 MutationType type = action.getMutation().getMutateType(); 838 if ( 839 type != MutationType.PUT && type != MutationType.DELETE && mutations != null 840 && !mutations.isEmpty() 841 ) { 842 // Flush out any Puts or Deletes already collected. 843 doNonAtomicBatchOp(builder, region, quota, mutations, cellScanner, 844 spaceQuotaEnforcement); 845 mutations.clear(); 846 } 847 switch (type) { 848 case APPEND: 849 r = append(region, quota, action.getMutation(), cellScanner, nonceGroup, 850 spaceQuotaEnforcement, context); 851 break; 852 case INCREMENT: 853 r = increment(region, quota, action.getMutation(), cellScanner, nonceGroup, 854 spaceQuotaEnforcement, context); 855 break; 856 case PUT: 857 case DELETE: 858 // Collect the individual mutations and apply in a batch 859 if (mutations == null) { 860 mutations = new ArrayList<>(actions.getActionCount()); 861 } 862 mutations.add(action); 863 break; 864 default: 865 throw new DoNotRetryIOException("Unsupported mutate type: " + type.name()); 866 } 867 } else { 868 throw new HBaseIOException("Unexpected Action type"); 869 } 870 if (r != null) { 871 ClientProtos.Result pbResult = null; 872 if (isClientCellBlockSupport(context)) { 873 pbResult = ProtobufUtil.toResultNoData(r); 874 // Hard to guess the size here. Just make a rough guess. 875 if (cellsToReturn == null) { 876 cellsToReturn = new ArrayList<>(); 877 } 878 cellsToReturn.add(r); 879 } else { 880 pbResult = ProtobufUtil.toResult(r); 881 } 882 addSize(context, r); 883 hasResultOrException = true; 884 resultOrExceptionBuilder.setResult(pbResult); 885 } 886 // Could get to here and there was no result and no exception. Presumes we added 887 // a Put or Delete to the collecting Mutations List for adding later. In this 888 // case the corresponding ResultOrException instance for the Put or Delete will be added 889 // down in the doNonAtomicBatchOp method call rather than up here. 890 } catch (IOException ie) { 891 rpcServer.getMetrics().exception(ie); 892 hasResultOrException = true; 893 NameBytesPair pair = ResponseConverter.buildException(ie); 894 resultOrExceptionBuilder.setException(pair); 895 context.incrementResponseExceptionSize(pair.getSerializedSize()); 896 } 897 if (hasResultOrException) { 898 // Propagate index. 899 resultOrExceptionBuilder.setIndex(action.getIndex()); 900 builder.addResultOrException(resultOrExceptionBuilder.build()); 901 } 902 } 903 // Finish up any outstanding mutations 904 if (!CollectionUtils.isEmpty(mutations)) { 905 doNonAtomicBatchOp(builder, region, quota, mutations, cellScanner, spaceQuotaEnforcement); 906 } 907 return cellsToReturn; 908 } 909 910 private void checkCellSizeLimit(final HRegion r, final Mutation m) throws IOException { 911 if (r.maxCellSize > 0) { 912 CellScanner cells = m.cellScanner(); 913 while (cells.advance()) { 914 int size = PrivateCellUtil.estimatedSerializedSizeOf(cells.current()); 915 if (size > r.maxCellSize) { 916 String msg = "Cell[" + cells.current() + "] with size " + size + " exceeds limit of " 917 + r.maxCellSize + " bytes"; 918 LOG.debug(msg); 919 throw new DoNotRetryIOException(msg); 920 } 921 } 922 } 923 } 924 925 private void doAtomicBatchOp(final RegionActionResult.Builder builder, final HRegion region, 926 final OperationQuota quota, final List<ClientProtos.Action> mutations, final CellScanner cells, 927 long nonceGroup, ActivePolicyEnforcement spaceQuotaEnforcement) throws IOException { 928 // Just throw the exception. The exception will be caught and then added to region-level 929 // exception for RegionAction. Leaving the null to action result is ok since the null 930 // result is viewed as failure by hbase client. And the region-lever exception will be used 931 // to replaced the null result. see AsyncRequestFutureImpl#receiveMultiAction and 932 // AsyncBatchRpcRetryingCaller#onComplete for more details. 933 doBatchOp(builder, region, quota, mutations, cells, nonceGroup, spaceQuotaEnforcement, true); 934 } 935 936 private void doNonAtomicBatchOp(final RegionActionResult.Builder builder, final HRegion region, 937 final OperationQuota quota, final List<ClientProtos.Action> mutations, final CellScanner cells, 938 ActivePolicyEnforcement spaceQuotaEnforcement) { 939 try { 940 doBatchOp(builder, region, quota, mutations, cells, HConstants.NO_NONCE, 941 spaceQuotaEnforcement, false); 942 } catch (IOException e) { 943 // Set the exception for each action. The mutations in same RegionAction are group to 944 // different batch and then be processed individually. Hence, we don't set the region-level 945 // exception here for whole RegionAction. 946 for (Action mutation : mutations) { 947 builder.addResultOrException(getResultOrException(e, mutation.getIndex())); 948 } 949 } 950 } 951 952 /** 953 * Execute a list of mutations. 954 */ 955 private void doBatchOp(final RegionActionResult.Builder builder, final HRegion region, 956 final OperationQuota quota, final List<ClientProtos.Action> mutations, final CellScanner cells, 957 long nonceGroup, ActivePolicyEnforcement spaceQuotaEnforcement, boolean atomic) 958 throws IOException { 959 Mutation[] mArray = new Mutation[mutations.size()]; 960 long before = EnvironmentEdgeManager.currentTime(); 961 boolean batchContainsPuts = false, batchContainsDelete = false; 962 try { 963 /** 964 * HBASE-17924 mutationActionMap is a map to map the relation between mutations and actions 965 * since mutation array may have been reoredered.In order to return the right result or 966 * exception to the corresponding actions, We need to know which action is the mutation belong 967 * to. We can't sort ClientProtos.Action array, since they are bonded to cellscanners. 968 */ 969 Map<Mutation, ClientProtos.Action> mutationActionMap = new HashMap<>(); 970 int i = 0; 971 long nonce = HConstants.NO_NONCE; 972 for (ClientProtos.Action action : mutations) { 973 if (action.hasGet()) { 974 throw new DoNotRetryIOException( 975 "Atomic put and/or delete only, not a Get=" + action.getGet()); 976 } 977 MutationProto m = action.getMutation(); 978 Mutation mutation; 979 switch (m.getMutateType()) { 980 case PUT: 981 mutation = ProtobufUtil.toPut(m, cells); 982 batchContainsPuts = true; 983 break; 984 985 case DELETE: 986 mutation = ProtobufUtil.toDelete(m, cells); 987 batchContainsDelete = true; 988 break; 989 990 case INCREMENT: 991 mutation = ProtobufUtil.toIncrement(m, cells); 992 nonce = m.hasNonce() ? m.getNonce() : HConstants.NO_NONCE; 993 break; 994 995 case APPEND: 996 mutation = ProtobufUtil.toAppend(m, cells); 997 nonce = m.hasNonce() ? m.getNonce() : HConstants.NO_NONCE; 998 break; 999 1000 default: 1001 throw new DoNotRetryIOException("Invalid mutation type : " + m.getMutateType()); 1002 } 1003 mutationActionMap.put(mutation, action); 1004 mArray[i++] = mutation; 1005 checkCellSizeLimit(region, mutation); 1006 // Check if a space quota disallows this mutation 1007 spaceQuotaEnforcement.getPolicyEnforcement(region).check(mutation); 1008 quota.addMutation(mutation); 1009 } 1010 1011 if (!region.getRegionInfo().isMetaRegion()) { 1012 server.getMemStoreFlusher().reclaimMemStoreMemory(); 1013 } 1014 1015 // HBASE-17924 1016 // Sort to improve lock efficiency for non-atomic batch of operations. If atomic 1017 // order is preserved as its expected from the client 1018 if (!atomic) { 1019 Arrays.sort(mArray, (v1, v2) -> Row.COMPARATOR.compare(v1, v2)); 1020 } 1021 1022 OperationStatus[] codes = region.batchMutate(mArray, atomic, nonceGroup, nonce); 1023 1024 // When atomic is true, it indicates that the mutateRow API or the batch API with 1025 // RowMutations is called. In this case, we need to merge the results of the 1026 // Increment/Append operations if the mutations include those operations, and set the merged 1027 // result to the first element of the ResultOrException list 1028 if (atomic) { 1029 List<ResultOrException> resultOrExceptions = new ArrayList<>(); 1030 List<Result> results = new ArrayList<>(); 1031 for (i = 0; i < codes.length; i++) { 1032 if (codes[i].getResult() != null) { 1033 results.add(codes[i].getResult()); 1034 } 1035 if (i != 0) { 1036 resultOrExceptions 1037 .add(getResultOrException(ClientProtos.Result.getDefaultInstance(), i)); 1038 } 1039 } 1040 1041 if (results.isEmpty()) { 1042 builder.addResultOrException( 1043 getResultOrException(ClientProtos.Result.getDefaultInstance(), 0)); 1044 } else { 1045 // Merge the results of the Increment/Append operations 1046 List<Cell> cellList = new ArrayList<>(); 1047 for (Result result : results) { 1048 if (result.rawCells() != null) { 1049 cellList.addAll(Arrays.asList(result.rawCells())); 1050 } 1051 } 1052 Result result = Result.create(cellList); 1053 1054 // Set the merged result of the Increment/Append operations to the first element of the 1055 // ResultOrException list 1056 builder.addResultOrException(getResultOrException(ProtobufUtil.toResult(result), 0)); 1057 } 1058 1059 builder.addAllResultOrException(resultOrExceptions); 1060 return; 1061 } 1062 1063 for (i = 0; i < codes.length; i++) { 1064 Mutation currentMutation = mArray[i]; 1065 ClientProtos.Action currentAction = mutationActionMap.get(currentMutation); 1066 int index = currentAction.hasIndex() ? currentAction.getIndex() : i; 1067 Exception e; 1068 switch (codes[i].getOperationStatusCode()) { 1069 case BAD_FAMILY: 1070 e = new NoSuchColumnFamilyException(codes[i].getExceptionMsg()); 1071 builder.addResultOrException(getResultOrException(e, index)); 1072 break; 1073 1074 case SANITY_CHECK_FAILURE: 1075 e = new FailedSanityCheckException(codes[i].getExceptionMsg()); 1076 builder.addResultOrException(getResultOrException(e, index)); 1077 break; 1078 1079 default: 1080 e = new DoNotRetryIOException(codes[i].getExceptionMsg()); 1081 builder.addResultOrException(getResultOrException(e, index)); 1082 break; 1083 1084 case SUCCESS: 1085 ClientProtos.Result result = codes[i].getResult() == null 1086 ? ClientProtos.Result.getDefaultInstance() 1087 : ProtobufUtil.toResult(codes[i].getResult()); 1088 builder.addResultOrException(getResultOrException(result, index)); 1089 break; 1090 1091 case STORE_TOO_BUSY: 1092 e = new RegionTooBusyException(codes[i].getExceptionMsg()); 1093 builder.addResultOrException(getResultOrException(e, index)); 1094 break; 1095 } 1096 } 1097 } finally { 1098 int processedMutationIndex = 0; 1099 for (Action mutation : mutations) { 1100 // The non-null mArray[i] means the cell scanner has been read. 1101 if (mArray[processedMutationIndex++] == null) { 1102 skipCellsForMutation(mutation, cells); 1103 } 1104 } 1105 updateMutationMetrics(region, before, batchContainsPuts, batchContainsDelete); 1106 } 1107 } 1108 1109 private void updateMutationMetrics(HRegion region, long starttime, boolean batchContainsPuts, 1110 boolean batchContainsDelete) { 1111 final MetricsRegionServer metricsRegionServer = server.getMetrics(); 1112 if (metricsRegionServer != null) { 1113 long after = EnvironmentEdgeManager.currentTime(); 1114 if (batchContainsPuts) { 1115 metricsRegionServer.updatePutBatch(region, after - starttime); 1116 } 1117 if (batchContainsDelete) { 1118 metricsRegionServer.updateDeleteBatch(region, after - starttime); 1119 } 1120 } 1121 } 1122 1123 /** 1124 * Execute a list of Put/Delete mutations. The function returns OperationStatus instead of 1125 * constructing MultiResponse to save a possible loop if caller doesn't need MultiResponse. 1126 * @return an array of OperationStatus which internally contains the OperationStatusCode and the 1127 * exceptionMessage if any 1128 * @deprecated Since 3.0.0, will be removed in 4.0.0. We do not use this method for replaying 1129 * edits for secondary replicas any more, see 1130 * {@link #replicateToReplica(RpcController, ReplicateWALEntryRequest)}. 1131 */ 1132 @Deprecated 1133 private OperationStatus[] doReplayBatchOp(final HRegion region, 1134 final List<MutationReplay> mutations, long replaySeqId) throws IOException { 1135 long before = EnvironmentEdgeManager.currentTime(); 1136 boolean batchContainsPuts = false, batchContainsDelete = false; 1137 try { 1138 for (Iterator<MutationReplay> it = mutations.iterator(); it.hasNext();) { 1139 MutationReplay m = it.next(); 1140 1141 if (m.getType() == MutationType.PUT) { 1142 batchContainsPuts = true; 1143 } else { 1144 batchContainsDelete = true; 1145 } 1146 1147 NavigableMap<byte[], List<Cell>> map = m.mutation.getFamilyCellMap(); 1148 List<Cell> metaCells = map.get(WALEdit.METAFAMILY); 1149 if (metaCells != null && !metaCells.isEmpty()) { 1150 for (Cell metaCell : metaCells) { 1151 CompactionDescriptor compactionDesc = WALEdit.getCompaction(metaCell); 1152 boolean isDefaultReplica = RegionReplicaUtil.isDefaultReplica(region.getRegionInfo()); 1153 HRegion hRegion = region; 1154 if (compactionDesc != null) { 1155 // replay the compaction. Remove the files from stores only if we are the primary 1156 // region replica (thus own the files) 1157 hRegion.replayWALCompactionMarker(compactionDesc, !isDefaultReplica, isDefaultReplica, 1158 replaySeqId); 1159 continue; 1160 } 1161 FlushDescriptor flushDesc = WALEdit.getFlushDescriptor(metaCell); 1162 if (flushDesc != null && !isDefaultReplica) { 1163 hRegion.replayWALFlushMarker(flushDesc, replaySeqId); 1164 continue; 1165 } 1166 RegionEventDescriptor regionEvent = WALEdit.getRegionEventDescriptor(metaCell); 1167 if (regionEvent != null && !isDefaultReplica) { 1168 hRegion.replayWALRegionEventMarker(regionEvent); 1169 continue; 1170 } 1171 BulkLoadDescriptor bulkLoadEvent = WALEdit.getBulkLoadDescriptor(metaCell); 1172 if (bulkLoadEvent != null) { 1173 hRegion.replayWALBulkLoadEventMarker(bulkLoadEvent); 1174 continue; 1175 } 1176 } 1177 it.remove(); 1178 } 1179 } 1180 requestCount.increment(); 1181 if (!region.getRegionInfo().isMetaRegion()) { 1182 server.getMemStoreFlusher().reclaimMemStoreMemory(); 1183 } 1184 return region.batchReplay(mutations.toArray(new MutationReplay[mutations.size()]), 1185 replaySeqId); 1186 } finally { 1187 updateMutationMetrics(region, before, batchContainsPuts, batchContainsDelete); 1188 } 1189 } 1190 1191 private void closeAllScanners() { 1192 // Close any outstanding scanners. Means they'll get an UnknownScanner 1193 // exception next time they come in. 1194 for (Map.Entry<String, RegionScannerHolder> e : scanners.entrySet()) { 1195 try { 1196 e.getValue().s.close(); 1197 } catch (IOException ioe) { 1198 LOG.warn("Closing scanner " + e.getKey(), ioe); 1199 } 1200 } 1201 } 1202 1203 // Directly invoked only for testing 1204 public RSRpcServices(final HRegionServer rs) throws IOException { 1205 super(rs, rs.getProcessName()); 1206 final Configuration conf = rs.getConfiguration(); 1207 setReloadableGuardrails(conf); 1208 scannerLeaseTimeoutPeriod = conf.getInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, 1209 HConstants.DEFAULT_HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD); 1210 rpcTimeout = 1211 conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY, HConstants.DEFAULT_HBASE_RPC_TIMEOUT); 1212 minimumScanTimeLimitDelta = conf.getLong(REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA, 1213 DEFAULT_REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA); 1214 rpcServer.setNamedQueueRecorder(rs.getNamedQueueRecorder()); 1215 closedScanners = CacheBuilder.newBuilder() 1216 .expireAfterAccess(scannerLeaseTimeoutPeriod, TimeUnit.MILLISECONDS).build(); 1217 } 1218 1219 @Override 1220 protected boolean defaultReservoirEnabled() { 1221 return true; 1222 } 1223 1224 @Override 1225 protected ServerType getDNSServerType() { 1226 return DNS.ServerType.REGIONSERVER; 1227 } 1228 1229 @Override 1230 protected String getHostname(Configuration conf, String defaultHostname) { 1231 return conf.get("hbase.regionserver.ipc.address", defaultHostname); 1232 } 1233 1234 @Override 1235 protected String getPortConfigName() { 1236 return HConstants.REGIONSERVER_PORT; 1237 } 1238 1239 @Override 1240 protected int getDefaultPort() { 1241 return HConstants.DEFAULT_REGIONSERVER_PORT; 1242 } 1243 1244 @Override 1245 protected Class<?> getRpcSchedulerFactoryClass(Configuration conf) { 1246 return conf.getClass(REGION_SERVER_RPC_SCHEDULER_FACTORY_CLASS, 1247 SimpleRpcSchedulerFactory.class); 1248 } 1249 1250 protected RpcServerInterface createRpcServer(final Server server, 1251 final RpcSchedulerFactory rpcSchedulerFactory, final InetSocketAddress bindAddress, 1252 final String name) throws IOException { 1253 final Configuration conf = server.getConfiguration(); 1254 boolean reservoirEnabled = conf.getBoolean(ByteBuffAllocator.ALLOCATOR_POOL_ENABLED_KEY, true); 1255 try { 1256 return RpcServerFactory.createRpcServer(server, name, getServices(), bindAddress, // use final 1257 // bindAddress 1258 // for this 1259 // server. 1260 conf, rpcSchedulerFactory.create(conf, this, server), reservoirEnabled); 1261 } catch (BindException be) { 1262 throw new IOException(be.getMessage() + ". To switch ports use the '" 1263 + HConstants.REGIONSERVER_PORT + "' configuration property.", 1264 be.getCause() != null ? be.getCause() : be); 1265 } 1266 } 1267 1268 protected Class<?> getRpcSchedulerFactoryClass() { 1269 final Configuration conf = server.getConfiguration(); 1270 return conf.getClass(REGION_SERVER_RPC_SCHEDULER_FACTORY_CLASS, 1271 SimpleRpcSchedulerFactory.class); 1272 } 1273 1274 protected PriorityFunction createPriority() { 1275 return new RSAnnotationReadingPriorityFunction(this); 1276 } 1277 1278 public int getScannersCount() { 1279 return scanners.size(); 1280 } 1281 1282 /** Returns The outstanding RegionScanner for <code>scannerId</code> or null if none found. */ 1283 RegionScanner getScanner(long scannerId) { 1284 RegionScannerHolder rsh = checkQuotaAndGetRegionScannerContext(scannerId); 1285 return rsh == null ? null : rsh.s; 1286 } 1287 1288 /** Returns The associated RegionScannerHolder for <code>scannerId</code> or null. */ 1289 private RegionScannerHolder checkQuotaAndGetRegionScannerContext(long scannerId) { 1290 return scanners.get(toScannerName(scannerId)); 1291 } 1292 1293 public String getScanDetailsWithId(long scannerId) { 1294 RegionScanner scanner = getScanner(scannerId); 1295 if (scanner == null) { 1296 return null; 1297 } 1298 StringBuilder builder = new StringBuilder(); 1299 builder.append("table: ").append(scanner.getRegionInfo().getTable().getNameAsString()); 1300 builder.append(" region: ").append(scanner.getRegionInfo().getRegionNameAsString()); 1301 builder.append(" operation_id: ").append(scanner.getOperationId()); 1302 return builder.toString(); 1303 } 1304 1305 public String getScanDetailsWithRequest(ScanRequest request) { 1306 try { 1307 if (!request.hasRegion()) { 1308 return null; 1309 } 1310 Region region = getRegion(request.getRegion()); 1311 StringBuilder builder = new StringBuilder(); 1312 builder.append("table: ").append(region.getRegionInfo().getTable().getNameAsString()); 1313 builder.append(" region: ").append(region.getRegionInfo().getRegionNameAsString()); 1314 for (NameBytesPair pair : request.getScan().getAttributeList()) { 1315 if (OperationWithAttributes.ID_ATRIBUTE.equals(pair.getName())) { 1316 builder.append(" operation_id: ").append(Bytes.toString(pair.getValue().toByteArray())); 1317 break; 1318 } 1319 } 1320 return builder.toString(); 1321 } catch (IOException ignored) { 1322 return null; 1323 } 1324 } 1325 1326 /** 1327 * Get the vtime associated with the scanner. Currently the vtime is the number of "next" calls. 1328 */ 1329 long getScannerVirtualTime(long scannerId) { 1330 RegionScannerHolder rsh = checkQuotaAndGetRegionScannerContext(scannerId); 1331 return rsh == null ? 0L : rsh.getNextCallSeq(); 1332 } 1333 1334 /** 1335 * Method to account for the size of retained cells. 1336 * @param context rpc call context 1337 * @param r result to add size. 1338 * @return an object that represents the last referenced block from this response. 1339 */ 1340 void addSize(RpcCallContext context, Result r) { 1341 if (context != null && r != null && !r.isEmpty()) { 1342 for (Cell c : r.rawCells()) { 1343 context.incrementResponseCellSize(PrivateCellUtil.estimatedSerializedSizeOf(c)); 1344 } 1345 } 1346 } 1347 1348 /** Returns Remote client's ip and port else null if can't be determined. */ 1349 @RestrictedApi(explanation = "Should only be called in TestRSRpcServices and RSRpcServices", 1350 link = "", allowedOnPath = ".*(TestRSRpcServices|RSRpcServices).java") 1351 static String getRemoteClientIpAndPort() { 1352 RpcCall rpcCall = RpcServer.getCurrentCall().orElse(null); 1353 if (rpcCall == null) { 1354 return HConstants.EMPTY_STRING; 1355 } 1356 InetAddress address = rpcCall.getRemoteAddress(); 1357 if (address == null) { 1358 return HConstants.EMPTY_STRING; 1359 } 1360 // Be careful here with InetAddress. Do InetAddress#getHostAddress. It will not do a name 1361 // resolution. Just use the IP. It is generally a smaller amount of info to keep around while 1362 // scanning than a hostname anyways. 1363 return Address.fromParts(address.getHostAddress(), rpcCall.getRemotePort()).toString(); 1364 } 1365 1366 /** Returns Remote client's username. */ 1367 @RestrictedApi(explanation = "Should only be called in TestRSRpcServices and RSRpcServices", 1368 link = "", allowedOnPath = ".*(TestRSRpcServices|RSRpcServices).java") 1369 static String getUserName() { 1370 RpcCall rpcCall = RpcServer.getCurrentCall().orElse(null); 1371 if (rpcCall == null) { 1372 return HConstants.EMPTY_STRING; 1373 } 1374 return rpcCall.getRequestUserName().orElse(HConstants.EMPTY_STRING); 1375 } 1376 1377 private RegionScannerHolder addScanner(String scannerName, RegionScanner s, Shipper shipper, 1378 HRegion r, boolean needCursor, boolean fullRegionScan) throws LeaseStillHeldException { 1379 Lease lease = server.getLeaseManager().createLease(scannerName, this.scannerLeaseTimeoutPeriod, 1380 new ScannerListener(scannerName)); 1381 RpcCallback shippedCallback = new RegionScannerShippedCallBack(scannerName, shipper, lease); 1382 RpcCallback closeCallback = 1383 s instanceof RpcCallback ? (RpcCallback) s : new RegionScannerCloseCallBack(s); 1384 RegionScannerHolder rsh = new RegionScannerHolder(s, r, closeCallback, shippedCallback, 1385 needCursor, fullRegionScan, getRemoteClientIpAndPort(), getUserName()); 1386 RegionScannerHolder existing = scanners.putIfAbsent(scannerName, rsh); 1387 assert existing == null : "scannerId must be unique within regionserver's whole lifecycle! " 1388 + scannerName + ", " + existing; 1389 return rsh; 1390 } 1391 1392 private boolean isFullRegionScan(Scan scan, HRegion region) { 1393 // If the scan start row equals or less than the start key of the region 1394 // and stop row greater than equals end key (if stop row present) 1395 // or if the stop row is empty 1396 // account this as a full region scan 1397 if ( 1398 Bytes.compareTo(scan.getStartRow(), region.getRegionInfo().getStartKey()) <= 0 1399 && (Bytes.compareTo(scan.getStopRow(), region.getRegionInfo().getEndKey()) >= 0 1400 && !Bytes.equals(region.getRegionInfo().getEndKey(), HConstants.EMPTY_END_ROW) 1401 || Bytes.equals(scan.getStopRow(), HConstants.EMPTY_END_ROW)) 1402 ) { 1403 return true; 1404 } 1405 return false; 1406 } 1407 1408 /** 1409 * Find the HRegion based on a region specifier 1410 * @param regionSpecifier the region specifier 1411 * @return the corresponding region 1412 * @throws IOException if the specifier is not null, but failed to find the region 1413 */ 1414 public HRegion getRegion(final RegionSpecifier regionSpecifier) throws IOException { 1415 return server.getRegion(regionSpecifier.getValue().toByteArray()); 1416 } 1417 1418 /** 1419 * Find the List of HRegions based on a list of region specifiers 1420 * @param regionSpecifiers the list of region specifiers 1421 * @return the corresponding list of regions 1422 * @throws IOException if any of the specifiers is not null, but failed to find the region 1423 */ 1424 private List<HRegion> getRegions(final List<RegionSpecifier> regionSpecifiers, 1425 final CacheEvictionStatsBuilder stats) { 1426 List<HRegion> regions = Lists.newArrayListWithCapacity(regionSpecifiers.size()); 1427 for (RegionSpecifier regionSpecifier : regionSpecifiers) { 1428 try { 1429 regions.add(server.getRegion(regionSpecifier.getValue().toByteArray())); 1430 } catch (NotServingRegionException e) { 1431 stats.addException(regionSpecifier.getValue().toByteArray(), e); 1432 } 1433 } 1434 return regions; 1435 } 1436 1437 PriorityFunction getPriority() { 1438 return priority; 1439 } 1440 1441 private RegionServerRpcQuotaManager getRpcQuotaManager() { 1442 return server.getRegionServerRpcQuotaManager(); 1443 } 1444 1445 private RegionServerSpaceQuotaManager getSpaceQuotaManager() { 1446 return server.getRegionServerSpaceQuotaManager(); 1447 } 1448 1449 void start(ZKWatcher zkWatcher) { 1450 this.scannerIdGenerator = new ScannerIdGenerator(this.server.getServerName()); 1451 internalStart(zkWatcher); 1452 } 1453 1454 void stop() { 1455 closeAllScanners(); 1456 internalStop(); 1457 } 1458 1459 /** 1460 * Called to verify that this server is up and running. 1461 */ 1462 // TODO : Rename this and HMaster#checkInitialized to isRunning() (or a better name). 1463 protected void checkOpen() throws IOException { 1464 if (server.isAborted()) { 1465 throw new RegionServerAbortedException("Server " + server.getServerName() + " aborting"); 1466 } 1467 if (server.isStopped()) { 1468 throw new RegionServerStoppedException("Server " + server.getServerName() + " stopping"); 1469 } 1470 if (!server.isDataFileSystemOk()) { 1471 throw new RegionServerStoppedException("File system not available"); 1472 } 1473 if (!server.isOnline()) { 1474 throw new ServerNotRunningYetException( 1475 "Server " + server.getServerName() + " is not running yet"); 1476 } 1477 } 1478 1479 /** 1480 * By default, put up an Admin and a Client Service. Set booleans 1481 * <code>hbase.regionserver.admin.executorService</code> and 1482 * <code>hbase.regionserver.client.executorService</code> if you want to enable/disable services. 1483 * Default is that both are enabled. 1484 * @return immutable list of blocking services and the security info classes that this server 1485 * supports 1486 */ 1487 protected List<BlockingServiceAndInterface> getServices() { 1488 boolean admin = getConfiguration().getBoolean(REGIONSERVER_ADMIN_SERVICE_CONFIG, true); 1489 boolean client = getConfiguration().getBoolean(REGIONSERVER_CLIENT_SERVICE_CONFIG, true); 1490 boolean clientMeta = 1491 getConfiguration().getBoolean(REGIONSERVER_CLIENT_META_SERVICE_CONFIG, true); 1492 boolean bootstrapNodes = 1493 getConfiguration().getBoolean(REGIONSERVER_BOOTSTRAP_NODES_SERVICE_CONFIG, true); 1494 List<BlockingServiceAndInterface> bssi = new ArrayList<>(); 1495 if (client) { 1496 bssi.add(new BlockingServiceAndInterface(ClientService.newReflectiveBlockingService(this), 1497 ClientService.BlockingInterface.class)); 1498 } 1499 if (admin) { 1500 bssi.add(new BlockingServiceAndInterface(AdminService.newReflectiveBlockingService(this), 1501 AdminService.BlockingInterface.class)); 1502 } 1503 if (clientMeta) { 1504 bssi.add(new BlockingServiceAndInterface(ClientMetaService.newReflectiveBlockingService(this), 1505 ClientMetaService.BlockingInterface.class)); 1506 } 1507 if (bootstrapNodes) { 1508 bssi.add( 1509 new BlockingServiceAndInterface(BootstrapNodeService.newReflectiveBlockingService(this), 1510 BootstrapNodeService.BlockingInterface.class)); 1511 } 1512 return new ImmutableList.Builder<BlockingServiceAndInterface>().addAll(bssi).build(); 1513 } 1514 1515 /** 1516 * Close a region on the region server. 1517 * @param controller the RPC controller 1518 * @param request the request 1519 */ 1520 @Override 1521 @QosPriority(priority = HConstants.ADMIN_QOS) 1522 public CloseRegionResponse closeRegion(final RpcController controller, 1523 final CloseRegionRequest request) throws ServiceException { 1524 final ServerName sn = (request.hasDestinationServer() 1525 ? ProtobufUtil.toServerName(request.getDestinationServer()) 1526 : null); 1527 1528 try { 1529 checkOpen(); 1530 throwOnWrongStartCode(request); 1531 final String encodedRegionName = ProtobufUtil.getRegionEncodedName(request.getRegion()); 1532 1533 requestCount.increment(); 1534 if (sn == null) { 1535 LOG.info("Close " + encodedRegionName + " without moving"); 1536 } else { 1537 LOG.info("Close " + encodedRegionName + ", moving to " + sn); 1538 } 1539 boolean closed = server.closeRegion(encodedRegionName, false, sn); 1540 CloseRegionResponse.Builder builder = CloseRegionResponse.newBuilder().setClosed(closed); 1541 return builder.build(); 1542 } catch (IOException ie) { 1543 throw new ServiceException(ie); 1544 } 1545 } 1546 1547 /** 1548 * Compact a region on the region server. 1549 * @param controller the RPC controller 1550 * @param request the request 1551 */ 1552 @Override 1553 @QosPriority(priority = HConstants.ADMIN_QOS) 1554 public CompactRegionResponse compactRegion(final RpcController controller, 1555 final CompactRegionRequest request) throws ServiceException { 1556 try { 1557 checkOpen(); 1558 requestCount.increment(); 1559 HRegion region = getRegion(request.getRegion()); 1560 // Quota support is enabled, the requesting user is not system/super user 1561 // and a quota policy is enforced that disables compactions. 1562 if ( 1563 QuotaUtil.isQuotaEnabled(getConfiguration()) 1564 && !Superusers.isSuperUser(RpcServer.getRequestUser().orElse(null)) 1565 && this.server.getRegionServerSpaceQuotaManager() 1566 .areCompactionsDisabled(region.getTableDescriptor().getTableName()) 1567 ) { 1568 throw new DoNotRetryIOException( 1569 "Compactions on this region are " + "disabled due to a space quota violation."); 1570 } 1571 region.startRegionOperation(Operation.COMPACT_REGION); 1572 LOG.info("Compacting " + region.getRegionInfo().getRegionNameAsString()); 1573 boolean major = request.hasMajor() && request.getMajor(); 1574 if (request.hasFamily()) { 1575 byte[] family = request.getFamily().toByteArray(); 1576 String log = "User-triggered " + (major ? "major " : "") + "compaction for region " 1577 + region.getRegionInfo().getRegionNameAsString() + " and family " 1578 + Bytes.toString(family); 1579 LOG.trace(log); 1580 region.requestCompaction(family, log, Store.PRIORITY_USER, major, 1581 CompactionLifeCycleTracker.DUMMY); 1582 } else { 1583 String log = "User-triggered " + (major ? "major " : "") + "compaction for region " 1584 + region.getRegionInfo().getRegionNameAsString(); 1585 LOG.trace(log); 1586 region.requestCompaction(log, Store.PRIORITY_USER, major, CompactionLifeCycleTracker.DUMMY); 1587 } 1588 return CompactRegionResponse.newBuilder().build(); 1589 } catch (IOException ie) { 1590 throw new ServiceException(ie); 1591 } 1592 } 1593 1594 @Override 1595 @QosPriority(priority = HConstants.ADMIN_QOS) 1596 public CompactionSwitchResponse compactionSwitch(RpcController controller, 1597 CompactionSwitchRequest request) throws ServiceException { 1598 rpcPreCheck("compactionSwitch"); 1599 final CompactSplit compactSplitThread = server.getCompactSplitThread(); 1600 requestCount.increment(); 1601 boolean prevState = compactSplitThread.isCompactionsEnabled(); 1602 CompactionSwitchResponse response = 1603 CompactionSwitchResponse.newBuilder().setPrevState(prevState).build(); 1604 if (prevState == request.getEnabled()) { 1605 // passed in requested state is same as current state. No action required 1606 return response; 1607 } 1608 compactSplitThread.switchCompaction(request.getEnabled()); 1609 return response; 1610 } 1611 1612 /** 1613 * Flush a region on the region server. 1614 * @param controller the RPC controller 1615 * @param request the request 1616 */ 1617 @Override 1618 @QosPriority(priority = HConstants.ADMIN_QOS) 1619 public FlushRegionResponse flushRegion(final RpcController controller, 1620 final FlushRegionRequest request) throws ServiceException { 1621 try { 1622 checkOpen(); 1623 requestCount.increment(); 1624 HRegion region = getRegion(request.getRegion()); 1625 LOG.info("Flushing " + region.getRegionInfo().getRegionNameAsString()); 1626 boolean shouldFlush = true; 1627 if (request.hasIfOlderThanTs()) { 1628 shouldFlush = region.getEarliestFlushTimeForAllStores() < request.getIfOlderThanTs(); 1629 } 1630 FlushRegionResponse.Builder builder = FlushRegionResponse.newBuilder(); 1631 if (shouldFlush) { 1632 boolean writeFlushWalMarker = 1633 request.hasWriteFlushWalMarker() ? request.getWriteFlushWalMarker() : false; 1634 // Go behind the curtain so we can manage writing of the flush WAL marker 1635 HRegion.FlushResultImpl flushResult = null; 1636 if (request.hasFamily()) { 1637 List<byte[]> families = new ArrayList(); 1638 families.add(request.getFamily().toByteArray()); 1639 TableDescriptor tableDescriptor = region.getTableDescriptor(); 1640 List<String> noSuchFamilies = families.stream() 1641 .filter(f -> !tableDescriptor.hasColumnFamily(f)).map(Bytes::toString).toList(); 1642 if (!noSuchFamilies.isEmpty()) { 1643 throw new NoSuchColumnFamilyException("Column families " + noSuchFamilies 1644 + " don't exist in table " + tableDescriptor.getTableName().getNameAsString()); 1645 } 1646 flushResult = 1647 region.flushcache(families, writeFlushWalMarker, FlushLifeCycleTracker.DUMMY); 1648 } else { 1649 flushResult = region.flushcache(true, writeFlushWalMarker, FlushLifeCycleTracker.DUMMY); 1650 } 1651 boolean compactionNeeded = flushResult.isCompactionNeeded(); 1652 if (compactionNeeded) { 1653 server.getCompactSplitThread().requestSystemCompaction(region, 1654 "Compaction through user triggered flush"); 1655 } 1656 builder.setFlushed(flushResult.isFlushSucceeded()); 1657 builder.setWroteFlushWalMarker(flushResult.wroteFlushWalMarker); 1658 } 1659 builder.setLastFlushTime(region.getEarliestFlushTimeForAllStores()); 1660 return builder.build(); 1661 } catch (DroppedSnapshotException ex) { 1662 // Cache flush can fail in a few places. If it fails in a critical 1663 // section, we get a DroppedSnapshotException and a replay of wal 1664 // is required. Currently the only way to do this is a restart of 1665 // the server. 1666 server.abort("Replay of WAL required. Forcing server shutdown", ex); 1667 throw new ServiceException(ex); 1668 } catch (IOException ie) { 1669 throw new ServiceException(ie); 1670 } 1671 } 1672 1673 @Override 1674 @QosPriority(priority = HConstants.ADMIN_QOS) 1675 public GetOnlineRegionResponse getOnlineRegion(final RpcController controller, 1676 final GetOnlineRegionRequest request) throws ServiceException { 1677 try { 1678 checkOpen(); 1679 requestCount.increment(); 1680 Map<String, HRegion> onlineRegions = server.getOnlineRegions(); 1681 List<RegionInfo> list = new ArrayList<>(onlineRegions.size()); 1682 for (HRegion region : onlineRegions.values()) { 1683 list.add(region.getRegionInfo()); 1684 } 1685 list.sort(RegionInfo.COMPARATOR); 1686 return ResponseConverter.buildGetOnlineRegionResponse(list); 1687 } catch (IOException ie) { 1688 throw new ServiceException(ie); 1689 } 1690 } 1691 1692 // Master implementation of this Admin Service differs given it is not 1693 // able to supply detail only known to RegionServer. See note on 1694 // MasterRpcServers#getRegionInfo. 1695 @Override 1696 @QosPriority(priority = HConstants.ADMIN_QOS) 1697 public GetRegionInfoResponse getRegionInfo(final RpcController controller, 1698 final GetRegionInfoRequest request) throws ServiceException { 1699 try { 1700 checkOpen(); 1701 requestCount.increment(); 1702 HRegion region = getRegion(request.getRegion()); 1703 RegionInfo info = region.getRegionInfo(); 1704 byte[] bestSplitRow; 1705 if (request.hasBestSplitRow() && request.getBestSplitRow()) { 1706 bestSplitRow = region.checkSplit(true).orElse(null); 1707 // when all table data are in memstore, bestSplitRow = null 1708 // try to flush region first 1709 if (bestSplitRow == null) { 1710 region.flush(true); 1711 bestSplitRow = region.checkSplit(true).orElse(null); 1712 } 1713 } else { 1714 bestSplitRow = null; 1715 } 1716 GetRegionInfoResponse.Builder builder = GetRegionInfoResponse.newBuilder(); 1717 builder.setRegionInfo(ProtobufUtil.toRegionInfo(info)); 1718 if (request.hasCompactionState() && request.getCompactionState()) { 1719 builder.setCompactionState(ProtobufUtil.createCompactionState(region.getCompactionState())); 1720 } 1721 builder.setSplittable(region.isSplittable()); 1722 builder.setMergeable(region.isMergeable()); 1723 if (request.hasBestSplitRow() && request.getBestSplitRow() && bestSplitRow != null) { 1724 builder.setBestSplitRow(UnsafeByteOperations.unsafeWrap(bestSplitRow)); 1725 } 1726 return builder.build(); 1727 } catch (IOException ie) { 1728 throw new ServiceException(ie); 1729 } 1730 } 1731 1732 @Override 1733 @QosPriority(priority = HConstants.ADMIN_QOS) 1734 public GetRegionLoadResponse getRegionLoad(RpcController controller, GetRegionLoadRequest request) 1735 throws ServiceException { 1736 1737 List<HRegion> regions; 1738 if (request.hasTableName()) { 1739 TableName tableName = ProtobufUtil.toTableName(request.getTableName()); 1740 regions = server.getRegions(tableName); 1741 } else { 1742 regions = server.getRegions(); 1743 } 1744 List<RegionLoad> rLoads = new ArrayList<>(regions.size()); 1745 RegionLoad.Builder regionLoadBuilder = ClusterStatusProtos.RegionLoad.newBuilder(); 1746 RegionSpecifier.Builder regionSpecifier = RegionSpecifier.newBuilder(); 1747 1748 try { 1749 for (HRegion region : regions) { 1750 rLoads.add(server.createRegionLoad(region, regionLoadBuilder, regionSpecifier)); 1751 } 1752 } catch (IOException e) { 1753 throw new ServiceException(e); 1754 } 1755 GetRegionLoadResponse.Builder builder = GetRegionLoadResponse.newBuilder(); 1756 builder.addAllRegionLoads(rLoads); 1757 return builder.build(); 1758 } 1759 1760 @Override 1761 @QosPriority(priority = HConstants.ADMIN_QOS) 1762 public ClearCompactionQueuesResponse clearCompactionQueues(RpcController controller, 1763 ClearCompactionQueuesRequest request) throws ServiceException { 1764 LOG.debug("Client=" + RpcServer.getRequestUserName().orElse(null) + "/" 1765 + RpcServer.getRemoteAddress().orElse(null) + " clear compactions queue"); 1766 ClearCompactionQueuesResponse.Builder respBuilder = ClearCompactionQueuesResponse.newBuilder(); 1767 requestCount.increment(); 1768 if (clearCompactionQueues.compareAndSet(false, true)) { 1769 final CompactSplit compactSplitThread = server.getCompactSplitThread(); 1770 try { 1771 checkOpen(); 1772 server.getRegionServerCoprocessorHost().preClearCompactionQueues(); 1773 for (String queueName : request.getQueueNameList()) { 1774 LOG.debug("clear " + queueName + " compaction queue"); 1775 switch (queueName) { 1776 case "long": 1777 compactSplitThread.clearLongCompactionsQueue(); 1778 break; 1779 case "short": 1780 compactSplitThread.clearShortCompactionsQueue(); 1781 break; 1782 default: 1783 LOG.warn("Unknown queue name " + queueName); 1784 throw new IOException("Unknown queue name " + queueName); 1785 } 1786 } 1787 server.getRegionServerCoprocessorHost().postClearCompactionQueues(); 1788 } catch (IOException ie) { 1789 throw new ServiceException(ie); 1790 } finally { 1791 clearCompactionQueues.set(false); 1792 } 1793 } else { 1794 LOG.warn("Clear compactions queue is executing by other admin."); 1795 } 1796 return respBuilder.build(); 1797 } 1798 1799 /** 1800 * Get some information of the region server. 1801 * @param controller the RPC controller 1802 * @param request the request 1803 */ 1804 @Override 1805 @QosPriority(priority = HConstants.ADMIN_QOS) 1806 public GetServerInfoResponse getServerInfo(final RpcController controller, 1807 final GetServerInfoRequest request) throws ServiceException { 1808 try { 1809 checkOpen(); 1810 } catch (IOException ie) { 1811 throw new ServiceException(ie); 1812 } 1813 requestCount.increment(); 1814 int infoPort = server.getInfoServer() != null ? server.getInfoServer().getPort() : -1; 1815 return ResponseConverter.buildGetServerInfoResponse(server.getServerName(), infoPort); 1816 } 1817 1818 @Override 1819 @QosPriority(priority = HConstants.ADMIN_QOS) 1820 public GetStoreFileResponse getStoreFile(final RpcController controller, 1821 final GetStoreFileRequest request) throws ServiceException { 1822 try { 1823 checkOpen(); 1824 HRegion region = getRegion(request.getRegion()); 1825 requestCount.increment(); 1826 Set<byte[]> columnFamilies; 1827 if (request.getFamilyCount() == 0) { 1828 columnFamilies = region.getTableDescriptor().getColumnFamilyNames(); 1829 } else { 1830 columnFamilies = new TreeSet<>(Bytes.BYTES_RAWCOMPARATOR); 1831 for (ByteString cf : request.getFamilyList()) { 1832 columnFamilies.add(cf.toByteArray()); 1833 } 1834 } 1835 int nCF = columnFamilies.size(); 1836 List<String> fileList = region.getStoreFileList(columnFamilies.toArray(new byte[nCF][])); 1837 GetStoreFileResponse.Builder builder = GetStoreFileResponse.newBuilder(); 1838 builder.addAllStoreFile(fileList); 1839 return builder.build(); 1840 } catch (IOException ie) { 1841 throw new ServiceException(ie); 1842 } 1843 } 1844 1845 private void throwOnWrongStartCode(OpenRegionRequest request) throws ServiceException { 1846 if (!request.hasServerStartCode()) { 1847 LOG.warn("OpenRegionRequest for {} does not have a start code", request.getOpenInfoList()); 1848 return; 1849 } 1850 throwOnWrongStartCode(request.getServerStartCode()); 1851 } 1852 1853 private void throwOnWrongStartCode(CloseRegionRequest request) throws ServiceException { 1854 if (!request.hasServerStartCode()) { 1855 LOG.warn("CloseRegionRequest for {} does not have a start code", request.getRegion()); 1856 return; 1857 } 1858 throwOnWrongStartCode(request.getServerStartCode()); 1859 } 1860 1861 private void throwOnWrongStartCode(long serverStartCode) throws ServiceException { 1862 // check that we are the same server that this RPC is intended for. 1863 if (server.getServerName().getStartcode() != serverStartCode) { 1864 throw new ServiceException(new DoNotRetryIOException( 1865 "This RPC was intended for a " + "different server with startCode: " + serverStartCode 1866 + ", this server is: " + server.getServerName())); 1867 } 1868 } 1869 1870 private void throwOnWrongStartCode(ExecuteProceduresRequest req) throws ServiceException { 1871 if (req.getOpenRegionCount() > 0) { 1872 for (OpenRegionRequest openReq : req.getOpenRegionList()) { 1873 throwOnWrongStartCode(openReq); 1874 } 1875 } 1876 if (req.getCloseRegionCount() > 0) { 1877 for (CloseRegionRequest closeReq : req.getCloseRegionList()) { 1878 throwOnWrongStartCode(closeReq); 1879 } 1880 } 1881 } 1882 1883 /** 1884 * Open asynchronously a region or a set of regions on the region server. The opening is 1885 * coordinated by ZooKeeper, and this method requires the znode to be created before being called. 1886 * As a consequence, this method should be called only from the master. 1887 * <p> 1888 * Different manages states for the region are: 1889 * </p> 1890 * <ul> 1891 * <li>region not opened: the region opening will start asynchronously.</li> 1892 * <li>a close is already in progress: this is considered as an error.</li> 1893 * <li>an open is already in progress: this new open request will be ignored. This is important 1894 * because the Master can do multiple requests if it crashes.</li> 1895 * <li>the region is already opened: this new open request will be ignored.</li> 1896 * </ul> 1897 * <p> 1898 * Bulk assign: If there are more than 1 region to open, it will be considered as a bulk assign. 1899 * For a single region opening, errors are sent through a ServiceException. For bulk assign, 1900 * errors are put in the response as FAILED_OPENING. 1901 * </p> 1902 * @param controller the RPC controller 1903 * @param request the request 1904 */ 1905 @Override 1906 @QosPriority(priority = HConstants.ADMIN_QOS) 1907 public OpenRegionResponse openRegion(final RpcController controller, 1908 final OpenRegionRequest request) throws ServiceException { 1909 requestCount.increment(); 1910 throwOnWrongStartCode(request); 1911 1912 OpenRegionResponse.Builder builder = OpenRegionResponse.newBuilder(); 1913 final int regionCount = request.getOpenInfoCount(); 1914 final Map<TableName, TableDescriptor> htds = new HashMap<>(regionCount); 1915 final boolean isBulkAssign = regionCount > 1; 1916 try { 1917 checkOpen(); 1918 } catch (IOException ie) { 1919 TableName tableName = null; 1920 if (regionCount == 1) { 1921 org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionInfo ri = 1922 request.getOpenInfo(0).getRegion(); 1923 if (ri != null) { 1924 tableName = ProtobufUtil.toTableName(ri.getTableName()); 1925 } 1926 } 1927 if (!TableName.META_TABLE_NAME.equals(tableName)) { 1928 throw new ServiceException(ie); 1929 } 1930 // We are assigning meta, wait a little for regionserver to finish initialization. 1931 // Default to quarter of RPC timeout 1932 int timeout = server.getConfiguration().getInt(HConstants.HBASE_RPC_TIMEOUT_KEY, 1933 HConstants.DEFAULT_HBASE_RPC_TIMEOUT) >> 2; 1934 long endTime = EnvironmentEdgeManager.currentTime() + timeout; 1935 synchronized (server.online) { 1936 try { 1937 while ( 1938 EnvironmentEdgeManager.currentTime() <= endTime && !server.isStopped() 1939 && !server.isOnline() 1940 ) { 1941 server.online.wait(server.getMsgInterval()); 1942 } 1943 checkOpen(); 1944 } catch (InterruptedException t) { 1945 Thread.currentThread().interrupt(); 1946 throw new ServiceException(t); 1947 } catch (IOException e) { 1948 throw new ServiceException(e); 1949 } 1950 } 1951 } 1952 1953 long masterSystemTime = request.hasMasterSystemTime() ? request.getMasterSystemTime() : -1; 1954 1955 for (RegionOpenInfo regionOpenInfo : request.getOpenInfoList()) { 1956 final RegionInfo region = ProtobufUtil.toRegionInfo(regionOpenInfo.getRegion()); 1957 TableDescriptor htd; 1958 try { 1959 String encodedName = region.getEncodedName(); 1960 byte[] encodedNameBytes = region.getEncodedNameAsBytes(); 1961 final HRegion onlineRegion = server.getRegion(encodedName); 1962 if (onlineRegion != null) { 1963 // The region is already online. This should not happen any more. 1964 String error = "Received OPEN for the region:" + region.getRegionNameAsString() 1965 + ", which is already online"; 1966 LOG.warn(error); 1967 // server.abort(error); 1968 // throw new IOException(error); 1969 builder.addOpeningState(RegionOpeningState.OPENED); 1970 continue; 1971 } 1972 LOG.info("Open " + region.getRegionNameAsString()); 1973 1974 final Boolean previous = 1975 server.getRegionsInTransitionInRS().putIfAbsent(encodedNameBytes, Boolean.TRUE); 1976 1977 if (Boolean.FALSE.equals(previous)) { 1978 if (server.getRegion(encodedName) != null) { 1979 // There is a close in progress. This should not happen any more. 1980 String error = "Received OPEN for the region:" + region.getRegionNameAsString() 1981 + ", which we are already trying to CLOSE"; 1982 server.abort(error); 1983 throw new IOException(error); 1984 } 1985 server.getRegionsInTransitionInRS().put(encodedNameBytes, Boolean.TRUE); 1986 } 1987 1988 if (Boolean.TRUE.equals(previous)) { 1989 // An open is in progress. This is supported, but let's log this. 1990 LOG.info("Receiving OPEN for the region:" + region.getRegionNameAsString() 1991 + ", which we are already trying to OPEN" 1992 + " - ignoring this new request for this region."); 1993 } 1994 1995 // We are opening this region. If it moves back and forth for whatever reason, we don't 1996 // want to keep returning the stale moved record while we are opening/if we close again. 1997 server.removeFromMovedRegions(region.getEncodedName()); 1998 1999 if (previous == null || !previous.booleanValue()) { 2000 htd = htds.get(region.getTable()); 2001 if (htd == null) { 2002 htd = server.getTableDescriptors().get(region.getTable()); 2003 htds.put(region.getTable(), htd); 2004 } 2005 if (htd == null) { 2006 throw new IOException("Missing table descriptor for " + region.getEncodedName()); 2007 } 2008 // If there is no action in progress, we can submit a specific handler. 2009 // Need to pass the expected version in the constructor. 2010 if (server.getExecutorService() == null) { 2011 LOG.info("No executor executorService; skipping open request"); 2012 } else { 2013 if (region.isMetaRegion()) { 2014 server.getExecutorService() 2015 .submit(new OpenMetaHandler(server, server, region, htd, masterSystemTime)); 2016 } else { 2017 if (regionOpenInfo.getFavoredNodesCount() > 0) { 2018 server.updateRegionFavoredNodesMapping(region.getEncodedName(), 2019 regionOpenInfo.getFavoredNodesList()); 2020 } 2021 if (htd.getPriority() >= HConstants.ADMIN_QOS || region.getTable().isSystemTable()) { 2022 server.getExecutorService().submit( 2023 new OpenPriorityRegionHandler(server, server, region, htd, masterSystemTime)); 2024 } else { 2025 server.getExecutorService() 2026 .submit(new OpenRegionHandler(server, server, region, htd, masterSystemTime)); 2027 } 2028 } 2029 } 2030 } 2031 2032 builder.addOpeningState(RegionOpeningState.OPENED); 2033 } catch (IOException ie) { 2034 LOG.warn("Failed opening region " + region.getRegionNameAsString(), ie); 2035 if (isBulkAssign) { 2036 builder.addOpeningState(RegionOpeningState.FAILED_OPENING); 2037 } else { 2038 throw new ServiceException(ie); 2039 } 2040 } 2041 } 2042 return builder.build(); 2043 } 2044 2045 /** 2046 * Warmup a region on this server. This method should only be called by Master. It synchronously 2047 * opens the region and closes the region bringing the most important pages in cache. 2048 */ 2049 @Override 2050 public WarmupRegionResponse warmupRegion(final RpcController controller, 2051 final WarmupRegionRequest request) throws ServiceException { 2052 final RegionInfo region = ProtobufUtil.toRegionInfo(request.getRegionInfo()); 2053 WarmupRegionResponse response = WarmupRegionResponse.getDefaultInstance(); 2054 try { 2055 checkOpen(); 2056 String encodedName = region.getEncodedName(); 2057 byte[] encodedNameBytes = region.getEncodedNameAsBytes(); 2058 final HRegion onlineRegion = server.getRegion(encodedName); 2059 if (onlineRegion != null) { 2060 LOG.info("{} is online; skipping warmup", region); 2061 return response; 2062 } 2063 TableDescriptor htd = server.getTableDescriptors().get(region.getTable()); 2064 if (server.getRegionsInTransitionInRS().containsKey(encodedNameBytes)) { 2065 LOG.info("{} is in transition; skipping warmup", region); 2066 return response; 2067 } 2068 LOG.info("Warmup {}", region.getRegionNameAsString()); 2069 HRegion.warmupHRegion(region, htd, server.getWAL(region), server.getConfiguration(), server, 2070 null); 2071 } catch (IOException ie) { 2072 LOG.error("Failed warmup of {}", region.getRegionNameAsString(), ie); 2073 throw new ServiceException(ie); 2074 } 2075 2076 return response; 2077 } 2078 2079 private ExtendedCellScanner getAndReset(RpcController controller) { 2080 HBaseRpcController hrc = (HBaseRpcController) controller; 2081 ExtendedCellScanner cells = hrc.cellScanner(); 2082 hrc.setCellScanner(null); 2083 return cells; 2084 } 2085 2086 /** 2087 * Replay the given changes when distributedLogReplay WAL edits from a failed RS. The guarantee is 2088 * that the given mutations will be durable on the receiving RS if this method returns without any 2089 * exception. 2090 * @param controller the RPC controller 2091 * @param request the request 2092 * @deprecated Since 3.0.0, will be removed in 4.0.0. Not used any more, put here only for 2093 * compatibility with old region replica implementation. Now we will use 2094 * {@code replicateToReplica} method instead. 2095 */ 2096 @Deprecated 2097 @Override 2098 @QosPriority(priority = HConstants.REPLAY_QOS) 2099 public ReplicateWALEntryResponse replay(final RpcController controller, 2100 final ReplicateWALEntryRequest request) throws ServiceException { 2101 long before = EnvironmentEdgeManager.currentTime(); 2102 ExtendedCellScanner cells = getAndReset(controller); 2103 try { 2104 checkOpen(); 2105 List<WALEntry> entries = request.getEntryList(); 2106 if (entries == null || entries.isEmpty()) { 2107 // empty input 2108 return ReplicateWALEntryResponse.newBuilder().build(); 2109 } 2110 ByteString regionName = entries.get(0).getKey().getEncodedRegionName(); 2111 HRegion region = server.getRegionByEncodedName(regionName.toStringUtf8()); 2112 RegionCoprocessorHost coprocessorHost = 2113 ServerRegionReplicaUtil.isDefaultReplica(region.getRegionInfo()) 2114 ? region.getCoprocessorHost() 2115 : null; // do not invoke coprocessors if this is a secondary region replica 2116 2117 // Skip adding the edits to WAL if this is a secondary region replica 2118 boolean isPrimary = RegionReplicaUtil.isDefaultReplica(region.getRegionInfo()); 2119 Durability durability = isPrimary ? Durability.USE_DEFAULT : Durability.SKIP_WAL; 2120 2121 for (WALEntry entry : entries) { 2122 if (!regionName.equals(entry.getKey().getEncodedRegionName())) { 2123 throw new NotServingRegionException("Replay request contains entries from multiple " 2124 + "regions. First region:" + regionName.toStringUtf8() + " , other region:" 2125 + entry.getKey().getEncodedRegionName()); 2126 } 2127 if (server.nonceManager != null && isPrimary) { 2128 long nonceGroup = 2129 entry.getKey().hasNonceGroup() ? entry.getKey().getNonceGroup() : HConstants.NO_NONCE; 2130 long nonce = entry.getKey().hasNonce() ? entry.getKey().getNonce() : HConstants.NO_NONCE; 2131 server.nonceManager.reportOperationFromWal(nonceGroup, nonce, 2132 entry.getKey().getWriteTime()); 2133 } 2134 Pair<WALKey, WALEdit> walEntry = (coprocessorHost == null) ? null : new Pair<>(); 2135 List<MutationReplay> edits = 2136 WALSplitUtil.getMutationsFromWALEntry(entry, cells, walEntry, durability); 2137 if (edits != null && !edits.isEmpty()) { 2138 // HBASE-17924 2139 // sort to improve lock efficiency 2140 Collections.sort(edits, (v1, v2) -> Row.COMPARATOR.compare(v1.mutation, v2.mutation)); 2141 long replaySeqId = (entry.getKey().hasOrigSequenceNumber()) 2142 ? entry.getKey().getOrigSequenceNumber() 2143 : entry.getKey().getLogSequenceNumber(); 2144 OperationStatus[] result = doReplayBatchOp(region, edits, replaySeqId); 2145 // check if it's a partial success 2146 for (int i = 0; result != null && i < result.length; i++) { 2147 if (result[i] != OperationStatus.SUCCESS) { 2148 throw new IOException(result[i].getExceptionMsg()); 2149 } 2150 } 2151 } 2152 } 2153 2154 // sync wal at the end because ASYNC_WAL is used above 2155 WAL wal = region.getWAL(); 2156 if (wal != null) { 2157 wal.sync(); 2158 } 2159 return ReplicateWALEntryResponse.newBuilder().build(); 2160 } catch (IOException ie) { 2161 throw new ServiceException(ie); 2162 } finally { 2163 final MetricsRegionServer metricsRegionServer = server.getMetrics(); 2164 if (metricsRegionServer != null) { 2165 metricsRegionServer.updateReplay(EnvironmentEdgeManager.currentTime() - before); 2166 } 2167 } 2168 } 2169 2170 /** 2171 * Replay the given changes on a secondary replica 2172 */ 2173 @Override 2174 public ReplicateWALEntryResponse replicateToReplica(RpcController controller, 2175 ReplicateWALEntryRequest request) throws ServiceException { 2176 CellScanner cells = getAndReset(controller); 2177 try { 2178 checkOpen(); 2179 List<WALEntry> entries = request.getEntryList(); 2180 if (entries == null || entries.isEmpty()) { 2181 // empty input 2182 return ReplicateWALEntryResponse.newBuilder().build(); 2183 } 2184 ByteString regionName = entries.get(0).getKey().getEncodedRegionName(); 2185 HRegion region = server.getRegionByEncodedName(regionName.toStringUtf8()); 2186 if (RegionReplicaUtil.isDefaultReplica(region.getRegionInfo())) { 2187 throw new DoNotRetryIOException( 2188 "Should not replicate to primary replica " + region.getRegionInfo() + ", CODE BUG?"); 2189 } 2190 for (WALEntry entry : entries) { 2191 if (!regionName.equals(entry.getKey().getEncodedRegionName())) { 2192 throw new NotServingRegionException( 2193 "ReplicateToReplica request contains entries from multiple " + "regions. First region:" 2194 + regionName.toStringUtf8() + " , other region:" 2195 + entry.getKey().getEncodedRegionName()); 2196 } 2197 region.replayWALEntry(entry, cells); 2198 } 2199 return ReplicateWALEntryResponse.newBuilder().build(); 2200 } catch (IOException ie) { 2201 throw new ServiceException(ie); 2202 } 2203 } 2204 2205 private void checkShouldRejectReplicationRequest(List<WALEntry> entries) throws IOException { 2206 ReplicationSourceService replicationSource = server.getReplicationSourceService(); 2207 if (replicationSource == null || entries.isEmpty()) { 2208 return; 2209 } 2210 // We can ensure that all entries are for one peer, so only need to check one entry's 2211 // table name. if the table hit sync replication at peer side and the peer cluster 2212 // is (or is transiting to) state ACTIVE or DOWNGRADE_ACTIVE, we should reject to apply 2213 // those entries according to the design doc. 2214 TableName table = TableName.valueOf(entries.get(0).getKey().getTableName().toByteArray()); 2215 if ( 2216 replicationSource.getSyncReplicationPeerInfoProvider().checkState(table, 2217 RejectReplicationRequestStateChecker.get()) 2218 ) { 2219 throw new DoNotRetryIOException( 2220 "Reject to apply to sink cluster because sync replication state of sink cluster " 2221 + "is ACTIVE or DOWNGRADE_ACTIVE, table: " + table); 2222 } 2223 } 2224 2225 /** 2226 * Replicate WAL entries on the region server. 2227 * @param controller the RPC controller 2228 * @param request the request 2229 */ 2230 @Override 2231 @QosPriority(priority = HConstants.REPLICATION_QOS) 2232 public ReplicateWALEntryResponse replicateWALEntry(final RpcController controller, 2233 final ReplicateWALEntryRequest request) throws ServiceException { 2234 try { 2235 checkOpen(); 2236 if (server.getReplicationSinkService() != null) { 2237 requestCount.increment(); 2238 List<WALEntry> entries = request.getEntryList(); 2239 checkShouldRejectReplicationRequest(entries); 2240 ExtendedCellScanner cellScanner = getAndReset(controller); 2241 server.getRegionServerCoprocessorHost().preReplicateLogEntries(); 2242 server.getReplicationSinkService().replicateLogEntries(entries, cellScanner, 2243 request.getReplicationClusterId(), request.getSourceBaseNamespaceDirPath(), 2244 request.getSourceHFileArchiveDirPath()); 2245 server.getRegionServerCoprocessorHost().postReplicateLogEntries(); 2246 return ReplicateWALEntryResponse.newBuilder().build(); 2247 } else { 2248 throw new ServiceException("Replication services are not initialized yet"); 2249 } 2250 } catch (IOException ie) { 2251 throw new ServiceException(ie); 2252 } 2253 } 2254 2255 /** 2256 * Roll the WAL writer of the region server. 2257 * @param controller the RPC controller 2258 * @param request the request 2259 */ 2260 @Override 2261 @QosPriority(priority = HConstants.ADMIN_QOS) 2262 public RollWALWriterResponse rollWALWriter(final RpcController controller, 2263 final RollWALWriterRequest request) throws ServiceException { 2264 try { 2265 checkOpen(); 2266 requestCount.increment(); 2267 server.getRegionServerCoprocessorHost().preRollWALWriterRequest(); 2268 server.getWalRoller().requestRollAll(); 2269 server.getRegionServerCoprocessorHost().postRollWALWriterRequest(); 2270 RollWALWriterResponse.Builder builder = RollWALWriterResponse.newBuilder(); 2271 return builder.build(); 2272 } catch (IOException ie) { 2273 throw new ServiceException(ie); 2274 } 2275 } 2276 2277 /** 2278 * Stop the region server. 2279 * @param controller the RPC controller 2280 * @param request the request 2281 */ 2282 @Override 2283 @QosPriority(priority = HConstants.ADMIN_QOS) 2284 public StopServerResponse stopServer(final RpcController controller, 2285 final StopServerRequest request) throws ServiceException { 2286 rpcPreCheck("stopServer"); 2287 requestCount.increment(); 2288 String reason = request.getReason(); 2289 server.stop(reason); 2290 return StopServerResponse.newBuilder().build(); 2291 } 2292 2293 @Override 2294 public UpdateFavoredNodesResponse updateFavoredNodes(RpcController controller, 2295 UpdateFavoredNodesRequest request) throws ServiceException { 2296 rpcPreCheck("updateFavoredNodes"); 2297 List<UpdateFavoredNodesRequest.RegionUpdateInfo> openInfoList = request.getUpdateInfoList(); 2298 UpdateFavoredNodesResponse.Builder respBuilder = UpdateFavoredNodesResponse.newBuilder(); 2299 for (UpdateFavoredNodesRequest.RegionUpdateInfo regionUpdateInfo : openInfoList) { 2300 RegionInfo hri = ProtobufUtil.toRegionInfo(regionUpdateInfo.getRegion()); 2301 if (regionUpdateInfo.getFavoredNodesCount() > 0) { 2302 server.updateRegionFavoredNodesMapping(hri.getEncodedName(), 2303 regionUpdateInfo.getFavoredNodesList()); 2304 } 2305 } 2306 respBuilder.setResponse(openInfoList.size()); 2307 return respBuilder.build(); 2308 } 2309 2310 /** 2311 * Atomically bulk load several HFiles into an open region 2312 * @return true if successful, false is failed but recoverably (no action) 2313 * @throws ServiceException if failed unrecoverably 2314 */ 2315 @Override 2316 public BulkLoadHFileResponse bulkLoadHFile(final RpcController controller, 2317 final BulkLoadHFileRequest request) throws ServiceException { 2318 long start = EnvironmentEdgeManager.currentTime(); 2319 List<String> clusterIds = new ArrayList<>(request.getClusterIdsList()); 2320 if (clusterIds.contains(this.server.getClusterId())) { 2321 return BulkLoadHFileResponse.newBuilder().setLoaded(true).build(); 2322 } else { 2323 clusterIds.add(this.server.getClusterId()); 2324 } 2325 try { 2326 checkOpen(); 2327 requestCount.increment(); 2328 HRegion region = getRegion(request.getRegion()); 2329 final boolean spaceQuotaEnabled = QuotaUtil.isQuotaEnabled(getConfiguration()); 2330 long sizeToBeLoaded = -1; 2331 2332 // Check to see if this bulk load would exceed the space quota for this table 2333 if (spaceQuotaEnabled) { 2334 ActivePolicyEnforcement activeSpaceQuotas = getSpaceQuotaManager().getActiveEnforcements(); 2335 SpaceViolationPolicyEnforcement enforcement = 2336 activeSpaceQuotas.getPolicyEnforcement(region); 2337 if (enforcement != null) { 2338 // Bulk loads must still be atomic. We must enact all or none. 2339 List<String> filePaths = new ArrayList<>(request.getFamilyPathCount()); 2340 for (FamilyPath familyPath : request.getFamilyPathList()) { 2341 filePaths.add(familyPath.getPath()); 2342 } 2343 // Check if the batch of files exceeds the current quota 2344 sizeToBeLoaded = enforcement.computeBulkLoadSize(getFileSystem(filePaths), filePaths); 2345 } 2346 } 2347 // secure bulk load 2348 Map<byte[], List<Path>> map = 2349 server.getSecureBulkLoadManager().secureBulkLoadHFiles(region, request, clusterIds); 2350 BulkLoadHFileResponse.Builder builder = BulkLoadHFileResponse.newBuilder(); 2351 builder.setLoaded(map != null); 2352 if (map != null) { 2353 // Treat any negative size as a flag to "ignore" updating the region size as that is 2354 // not possible to occur in real life (cannot bulk load a file with negative size) 2355 if (spaceQuotaEnabled && sizeToBeLoaded > 0) { 2356 if (LOG.isTraceEnabled()) { 2357 LOG.trace("Incrementing space use of " + region.getRegionInfo() + " by " 2358 + sizeToBeLoaded + " bytes"); 2359 } 2360 // Inform space quotas of the new files for this region 2361 getSpaceQuotaManager().getRegionSizeStore().incrementRegionSize(region.getRegionInfo(), 2362 sizeToBeLoaded); 2363 } 2364 } 2365 return builder.build(); 2366 } catch (IOException ie) { 2367 throw new ServiceException(ie); 2368 } finally { 2369 final MetricsRegionServer metricsRegionServer = server.getMetrics(); 2370 if (metricsRegionServer != null) { 2371 metricsRegionServer.updateBulkLoad(EnvironmentEdgeManager.currentTime() - start); 2372 } 2373 } 2374 } 2375 2376 @Override 2377 public PrepareBulkLoadResponse prepareBulkLoad(RpcController controller, 2378 PrepareBulkLoadRequest request) throws ServiceException { 2379 try { 2380 checkOpen(); 2381 requestCount.increment(); 2382 2383 HRegion region = getRegion(request.getRegion()); 2384 2385 String bulkToken = server.getSecureBulkLoadManager().prepareBulkLoad(region, request); 2386 PrepareBulkLoadResponse.Builder builder = PrepareBulkLoadResponse.newBuilder(); 2387 builder.setBulkToken(bulkToken); 2388 return builder.build(); 2389 } catch (IOException ie) { 2390 throw new ServiceException(ie); 2391 } 2392 } 2393 2394 @Override 2395 public CleanupBulkLoadResponse cleanupBulkLoad(RpcController controller, 2396 CleanupBulkLoadRequest request) throws ServiceException { 2397 try { 2398 checkOpen(); 2399 requestCount.increment(); 2400 2401 HRegion region = getRegion(request.getRegion()); 2402 2403 server.getSecureBulkLoadManager().cleanupBulkLoad(region, request); 2404 return CleanupBulkLoadResponse.newBuilder().build(); 2405 } catch (IOException ie) { 2406 throw new ServiceException(ie); 2407 } 2408 } 2409 2410 @Override 2411 public CoprocessorServiceResponse execService(final RpcController controller, 2412 final CoprocessorServiceRequest request) throws ServiceException { 2413 try { 2414 checkOpen(); 2415 requestCount.increment(); 2416 HRegion region = getRegion(request.getRegion()); 2417 Message result = execServiceOnRegion(region, request.getCall()); 2418 CoprocessorServiceResponse.Builder builder = CoprocessorServiceResponse.newBuilder(); 2419 builder.setRegion(RequestConverter.buildRegionSpecifier(RegionSpecifierType.REGION_NAME, 2420 region.getRegionInfo().getRegionName())); 2421 // TODO: COPIES!!!!!! 2422 builder.setValue(builder.getValueBuilder().setName(result.getClass().getName()).setValue( 2423 org.apache.hbase.thirdparty.com.google.protobuf.ByteString.copyFrom(result.toByteArray()))); 2424 return builder.build(); 2425 } catch (IOException ie) { 2426 throw new ServiceException(ie); 2427 } 2428 } 2429 2430 private FileSystem getFileSystem(List<String> filePaths) throws IOException { 2431 if (filePaths.isEmpty()) { 2432 // local hdfs 2433 return server.getFileSystem(); 2434 } 2435 // source hdfs 2436 return new Path(filePaths.get(0)).getFileSystem(server.getConfiguration()); 2437 } 2438 2439 private Message execServiceOnRegion(HRegion region, 2440 final ClientProtos.CoprocessorServiceCall serviceCall) throws IOException { 2441 // ignore the passed in controller (from the serialized call) 2442 ServerRpcController execController = new ServerRpcController(); 2443 return region.execService(execController, serviceCall); 2444 } 2445 2446 private boolean shouldRejectRequestsFromClient(HRegion region) { 2447 TableName table = region.getRegionInfo().getTable(); 2448 ReplicationSourceService service = server.getReplicationSourceService(); 2449 return service != null && service.getSyncReplicationPeerInfoProvider().checkState(table, 2450 RejectRequestsFromClientStateChecker.get()); 2451 } 2452 2453 private void rejectIfInStandByState(HRegion region) throws DoNotRetryIOException { 2454 if (shouldRejectRequestsFromClient(region)) { 2455 throw new DoNotRetryIOException( 2456 region.getRegionInfo().getRegionNameAsString() + " is in STANDBY state."); 2457 } 2458 } 2459 2460 /** 2461 * Get data from a table. 2462 * @param controller the RPC controller 2463 * @param request the get request 2464 */ 2465 @Override 2466 public GetResponse get(final RpcController controller, final GetRequest request) 2467 throws ServiceException { 2468 long before = EnvironmentEdgeManager.currentTime(); 2469 OperationQuota quota = null; 2470 HRegion region = null; 2471 RpcCallContext context = RpcServer.getCurrentCall().orElse(null); 2472 try { 2473 checkOpen(); 2474 requestCount.increment(); 2475 rpcGetRequestCount.increment(); 2476 region = getRegion(request.getRegion()); 2477 rejectIfInStandByState(region); 2478 2479 GetResponse.Builder builder = GetResponse.newBuilder(); 2480 ClientProtos.Get get = request.getGet(); 2481 // An asynchbase client, https://github.com/OpenTSDB/asynchbase, starts by trying to do 2482 // a get closest before. Throwing the UnknownProtocolException signals it that it needs 2483 // to switch and do hbase2 protocol (HBase servers do not tell clients what versions 2484 // they are; its a problem for non-native clients like asynchbase. HBASE-20225. 2485 if (get.hasClosestRowBefore() && get.getClosestRowBefore()) { 2486 throw new UnknownProtocolException("Is this a pre-hbase-1.0.0 or asynchbase client? " 2487 + "Client is invoking getClosestRowBefore removed in hbase-2.0.0 replaced by " 2488 + "reverse Scan."); 2489 } 2490 Boolean existence = null; 2491 Result r = null; 2492 quota = getRpcQuotaManager().checkBatchQuota(region, OperationQuota.OperationType.GET); 2493 2494 Get clientGet = ProtobufUtil.toGet(get); 2495 if (get.getExistenceOnly() && region.getCoprocessorHost() != null) { 2496 existence = region.getCoprocessorHost().preExists(clientGet); 2497 } 2498 if (existence == null) { 2499 if (context != null) { 2500 r = get(clientGet, (region), null, context); 2501 } else { 2502 // for test purpose 2503 r = region.get(clientGet); 2504 } 2505 if (get.getExistenceOnly()) { 2506 boolean exists = r.getExists(); 2507 if (region.getCoprocessorHost() != null) { 2508 exists = region.getCoprocessorHost().postExists(clientGet, exists); 2509 } 2510 existence = exists; 2511 } 2512 } 2513 if (existence != null) { 2514 ClientProtos.Result pbr = 2515 ProtobufUtil.toResult(existence, region.getRegionInfo().getReplicaId() != 0); 2516 builder.setResult(pbr); 2517 } else if (r != null) { 2518 ClientProtos.Result pbr; 2519 if ( 2520 isClientCellBlockSupport(context) && controller instanceof HBaseRpcController 2521 && VersionInfoUtil.hasMinimumVersion(context.getClientVersionInfo(), 1, 3) 2522 ) { 2523 pbr = ProtobufUtil.toResultNoData(r); 2524 ((HBaseRpcController) controller).setCellScanner( 2525 PrivateCellUtil.createExtendedCellScanner(ClientInternalHelper.getExtendedRawCells(r))); 2526 addSize(context, r); 2527 } else { 2528 pbr = ProtobufUtil.toResult(r); 2529 } 2530 builder.setResult(pbr); 2531 } 2532 // r.cells is null when an table.exists(get) call 2533 if (r != null && r.rawCells() != null) { 2534 quota.addGetResult(r); 2535 } 2536 return builder.build(); 2537 } catch (IOException ie) { 2538 throw new ServiceException(ie); 2539 } finally { 2540 final MetricsRegionServer metricsRegionServer = server.getMetrics(); 2541 if (metricsRegionServer != null && region != null) { 2542 long blockBytesScanned = context != null ? context.getBlockBytesScanned() : 0; 2543 metricsRegionServer.updateGet(region, EnvironmentEdgeManager.currentTime() - before, 2544 blockBytesScanned); 2545 } 2546 if (quota != null) { 2547 quota.close(); 2548 } 2549 } 2550 } 2551 2552 private Result get(Get get, HRegion region, RegionScannersCloseCallBack closeCallBack, 2553 RpcCallContext context) throws IOException { 2554 region.prepareGet(get); 2555 boolean stale = region.getRegionInfo().getReplicaId() != 0; 2556 2557 // This method is almost the same as HRegion#get. 2558 List<Cell> results = new ArrayList<>(); 2559 // pre-get CP hook 2560 if (region.getCoprocessorHost() != null) { 2561 if (region.getCoprocessorHost().preGet(get, results)) { 2562 region.metricsUpdateForGet(); 2563 return Result.create(results, get.isCheckExistenceOnly() ? !results.isEmpty() : null, 2564 stale); 2565 } 2566 } 2567 Scan scan = new Scan(get); 2568 if (scan.getLoadColumnFamiliesOnDemandValue() == null) { 2569 scan.setLoadColumnFamiliesOnDemand(region.isLoadingCfsOnDemandDefault()); 2570 } 2571 RegionScannerImpl scanner = null; 2572 long blockBytesScannedBefore = context.getBlockBytesScanned(); 2573 try { 2574 scanner = region.getScanner(scan); 2575 scanner.next(results); 2576 } finally { 2577 if (scanner != null) { 2578 if (closeCallBack == null) { 2579 // If there is a context then the scanner can be added to the current 2580 // RpcCallContext. The rpc callback will take care of closing the 2581 // scanner, for eg in case 2582 // of get() 2583 context.setCallBack(scanner); 2584 } else { 2585 // The call is from multi() where the results from the get() are 2586 // aggregated and then send out to the 2587 // rpc. The rpccall back will close all such scanners created as part 2588 // of multi(). 2589 closeCallBack.addScanner(scanner); 2590 } 2591 } 2592 } 2593 2594 // post-get CP hook 2595 if (region.getCoprocessorHost() != null) { 2596 region.getCoprocessorHost().postGet(get, results); 2597 } 2598 region.metricsUpdateForGet(); 2599 2600 Result r = 2601 Result.create(results, get.isCheckExistenceOnly() ? !results.isEmpty() : null, stale); 2602 if (get.isQueryMetricsEnabled()) { 2603 long blockBytesScanned = context.getBlockBytesScanned() - blockBytesScannedBefore; 2604 r.setMetrics(new QueryMetrics(blockBytesScanned)); 2605 } 2606 return r; 2607 } 2608 2609 private void checkBatchSizeAndLogLargeSize(MultiRequest request) throws ServiceException { 2610 int sum = 0; 2611 String firstRegionName = null; 2612 for (RegionAction regionAction : request.getRegionActionList()) { 2613 if (sum == 0) { 2614 firstRegionName = Bytes.toStringBinary(regionAction.getRegion().getValue().toByteArray()); 2615 } 2616 sum += regionAction.getActionCount(); 2617 } 2618 if (sum > rowSizeWarnThreshold) { 2619 LOG.warn("Large batch operation detected (greater than " + rowSizeWarnThreshold 2620 + ") (HBASE-18023)." + " Requested Number of Rows: " + sum + " Client: " 2621 + RpcServer.getRequestUserName().orElse(null) + "/" 2622 + RpcServer.getRemoteAddress().orElse(null) + " first region in multi=" + firstRegionName); 2623 if (rejectRowsWithSizeOverThreshold) { 2624 throw new ServiceException( 2625 "Rejecting large batch operation for current batch with firstRegionName: " 2626 + firstRegionName + " , Requested Number of Rows: " + sum + " , Size Threshold: " 2627 + rowSizeWarnThreshold); 2628 } 2629 } 2630 } 2631 2632 private void failRegionAction(MultiResponse.Builder responseBuilder, 2633 RegionActionResult.Builder regionActionResultBuilder, RegionAction regionAction, 2634 CellScanner cellScanner, Throwable error) { 2635 rpcServer.getMetrics().exception(error); 2636 regionActionResultBuilder.setException(ResponseConverter.buildException(error)); 2637 responseBuilder.addRegionActionResult(regionActionResultBuilder.build()); 2638 // All Mutations in this RegionAction not executed as we can not see the Region online here 2639 // in this RS. Will be retried from Client. Skipping all the Cells in CellScanner 2640 // corresponding to these Mutations. 2641 if (cellScanner != null) { 2642 skipCellsForMutations(regionAction.getActionList(), cellScanner); 2643 } 2644 } 2645 2646 private boolean isReplicationRequest(Action action) { 2647 // replication request can only be put or delete. 2648 if (!action.hasMutation()) { 2649 return false; 2650 } 2651 MutationProto mutation = action.getMutation(); 2652 MutationType type = mutation.getMutateType(); 2653 if (type != MutationType.PUT && type != MutationType.DELETE) { 2654 return false; 2655 } 2656 // replication will set a special attribute so we can make use of it to decide whether a request 2657 // is for replication. 2658 return mutation.getAttributeList().stream().map(p -> p.getName()) 2659 .filter(n -> n.equals(ReplicationUtils.REPLICATION_ATTR_NAME)).findAny().isPresent(); 2660 } 2661 2662 /** 2663 * Execute multiple actions on a table: get, mutate, and/or execCoprocessor 2664 * @param rpcc the RPC controller 2665 * @param request the multi request 2666 */ 2667 @Override 2668 public MultiResponse multi(final RpcController rpcc, final MultiRequest request) 2669 throws ServiceException { 2670 try { 2671 checkOpen(); 2672 } catch (IOException ie) { 2673 throw new ServiceException(ie); 2674 } 2675 2676 checkBatchSizeAndLogLargeSize(request); 2677 2678 // rpc controller is how we bring in data via the back door; it is unprotobuf'ed data. 2679 // It is also the conduit via which we pass back data. 2680 HBaseRpcController controller = (HBaseRpcController) rpcc; 2681 CellScanner cellScanner = controller != null ? getAndReset(controller) : null; 2682 2683 long nonceGroup = request.hasNonceGroup() ? request.getNonceGroup() : HConstants.NO_NONCE; 2684 2685 MultiResponse.Builder responseBuilder = MultiResponse.newBuilder(); 2686 RegionActionResult.Builder regionActionResultBuilder = RegionActionResult.newBuilder(); 2687 this.rpcMultiRequestCount.increment(); 2688 this.requestCount.increment(); 2689 ActivePolicyEnforcement spaceQuotaEnforcement = getSpaceQuotaManager().getActiveEnforcements(); 2690 2691 // We no longer use MultiRequest#condition. Instead, we use RegionAction#condition. The 2692 // following logic is for backward compatibility as old clients still use 2693 // MultiRequest#condition in case of checkAndMutate with RowMutations. 2694 if (request.hasCondition()) { 2695 if (request.getRegionActionList().isEmpty()) { 2696 // If the region action list is empty, do nothing. 2697 responseBuilder.setProcessed(true); 2698 return responseBuilder.build(); 2699 } 2700 2701 RegionAction regionAction = request.getRegionAction(0); 2702 2703 // When request.hasCondition() is true, regionAction.getAtomic() should be always true. So 2704 // we can assume regionAction.getAtomic() is true here. 2705 assert regionAction.getAtomic(); 2706 2707 OperationQuota quota; 2708 HRegion region; 2709 RegionSpecifier regionSpecifier = regionAction.getRegion(); 2710 2711 try { 2712 region = getRegion(regionSpecifier); 2713 quota = getRpcQuotaManager().checkBatchQuota(region, regionAction.getActionList(), 2714 regionAction.hasCondition()); 2715 } catch (IOException e) { 2716 failRegionAction(responseBuilder, regionActionResultBuilder, regionAction, cellScanner, e); 2717 return responseBuilder.build(); 2718 } 2719 2720 try { 2721 boolean rejectIfFromClient = shouldRejectRequestsFromClient(region); 2722 // We only allow replication in standby state and it will not set the atomic flag. 2723 if (rejectIfFromClient) { 2724 failRegionAction(responseBuilder, regionActionResultBuilder, regionAction, cellScanner, 2725 new DoNotRetryIOException( 2726 region.getRegionInfo().getRegionNameAsString() + " is in STANDBY state")); 2727 return responseBuilder.build(); 2728 } 2729 2730 try { 2731 CheckAndMutateResult result = checkAndMutate(region, regionAction.getActionList(), 2732 cellScanner, request.getCondition(), nonceGroup, spaceQuotaEnforcement); 2733 responseBuilder.setProcessed(result.isSuccess()); 2734 ClientProtos.ResultOrException.Builder resultOrExceptionOrBuilder = 2735 ClientProtos.ResultOrException.newBuilder(); 2736 for (int i = 0; i < regionAction.getActionCount(); i++) { 2737 // To unify the response format with doNonAtomicRegionMutation and read through 2738 // client's AsyncProcess we have to add an empty result instance per operation 2739 resultOrExceptionOrBuilder.clear(); 2740 resultOrExceptionOrBuilder.setIndex(i); 2741 regionActionResultBuilder.addResultOrException(resultOrExceptionOrBuilder.build()); 2742 } 2743 } catch (IOException e) { 2744 rpcServer.getMetrics().exception(e); 2745 // As it's an atomic operation with a condition, we may expect it's a global failure. 2746 regionActionResultBuilder.setException(ResponseConverter.buildException(e)); 2747 } 2748 } finally { 2749 quota.close(); 2750 } 2751 2752 responseBuilder.addRegionActionResult(regionActionResultBuilder.build()); 2753 ClientProtos.RegionLoadStats regionLoadStats = region.getLoadStatistics(); 2754 if (regionLoadStats != null) { 2755 responseBuilder.setRegionStatistics(MultiRegionLoadStats.newBuilder() 2756 .addRegion(regionSpecifier).addStat(regionLoadStats).build()); 2757 } 2758 return responseBuilder.build(); 2759 } 2760 2761 // this will contain all the cells that we need to return. It's created later, if needed. 2762 List<ExtendedCellScannable> cellsToReturn = null; 2763 RegionScannersCloseCallBack closeCallBack = null; 2764 RpcCallContext context = RpcServer.getCurrentCall().orElse(null); 2765 Map<RegionSpecifier, ClientProtos.RegionLoadStats> regionStats = 2766 new HashMap<>(request.getRegionActionCount()); 2767 2768 for (RegionAction regionAction : request.getRegionActionList()) { 2769 OperationQuota quota; 2770 HRegion region; 2771 RegionSpecifier regionSpecifier = regionAction.getRegion(); 2772 regionActionResultBuilder.clear(); 2773 2774 try { 2775 region = getRegion(regionSpecifier); 2776 quota = getRpcQuotaManager().checkBatchQuota(region, regionAction.getActionList(), 2777 regionAction.hasCondition()); 2778 } catch (IOException e) { 2779 failRegionAction(responseBuilder, regionActionResultBuilder, regionAction, cellScanner, e); 2780 continue; // For this region it's a failure. 2781 } 2782 2783 try { 2784 boolean rejectIfFromClient = shouldRejectRequestsFromClient(region); 2785 2786 if (regionAction.hasCondition()) { 2787 // We only allow replication in standby state and it will not set the atomic flag. 2788 if (rejectIfFromClient) { 2789 failRegionAction(responseBuilder, regionActionResultBuilder, regionAction, cellScanner, 2790 new DoNotRetryIOException( 2791 region.getRegionInfo().getRegionNameAsString() + " is in STANDBY state")); 2792 continue; 2793 } 2794 2795 try { 2796 ClientProtos.ResultOrException.Builder resultOrExceptionOrBuilder = 2797 ClientProtos.ResultOrException.newBuilder(); 2798 if (regionAction.getActionCount() == 1) { 2799 CheckAndMutateResult result = 2800 checkAndMutate(region, quota, regionAction.getAction(0).getMutation(), cellScanner, 2801 regionAction.getCondition(), nonceGroup, spaceQuotaEnforcement, context); 2802 regionActionResultBuilder.setProcessed(result.isSuccess()); 2803 resultOrExceptionOrBuilder.setIndex(0); 2804 if (result.getResult() != null) { 2805 resultOrExceptionOrBuilder.setResult(ProtobufUtil.toResult(result.getResult())); 2806 } 2807 2808 if (result.getMetrics() != null) { 2809 resultOrExceptionOrBuilder 2810 .setMetrics(ProtobufUtil.toQueryMetrics(result.getMetrics())); 2811 } 2812 2813 regionActionResultBuilder.addResultOrException(resultOrExceptionOrBuilder.build()); 2814 } else { 2815 CheckAndMutateResult result = checkAndMutate(region, regionAction.getActionList(), 2816 cellScanner, regionAction.getCondition(), nonceGroup, spaceQuotaEnforcement); 2817 regionActionResultBuilder.setProcessed(result.isSuccess()); 2818 for (int i = 0; i < regionAction.getActionCount(); i++) { 2819 if (i == 0 && result.getResult() != null) { 2820 // Set the result of the Increment/Append operations to the first element of the 2821 // ResultOrException list 2822 resultOrExceptionOrBuilder.setIndex(i); 2823 regionActionResultBuilder.addResultOrException(resultOrExceptionOrBuilder 2824 .setResult(ProtobufUtil.toResult(result.getResult())).build()); 2825 continue; 2826 } 2827 // To unify the response format with doNonAtomicRegionMutation and read through 2828 // client's AsyncProcess we have to add an empty result instance per operation 2829 resultOrExceptionOrBuilder.clear(); 2830 resultOrExceptionOrBuilder.setIndex(i); 2831 regionActionResultBuilder.addResultOrException(resultOrExceptionOrBuilder.build()); 2832 } 2833 } 2834 } catch (IOException e) { 2835 rpcServer.getMetrics().exception(e); 2836 // As it's an atomic operation with a condition, we may expect it's a global failure. 2837 regionActionResultBuilder.setException(ResponseConverter.buildException(e)); 2838 } 2839 } else if (regionAction.hasAtomic() && regionAction.getAtomic()) { 2840 // We only allow replication in standby state and it will not set the atomic flag. 2841 if (rejectIfFromClient) { 2842 failRegionAction(responseBuilder, regionActionResultBuilder, regionAction, cellScanner, 2843 new DoNotRetryIOException( 2844 region.getRegionInfo().getRegionNameAsString() + " is in STANDBY state")); 2845 continue; 2846 } 2847 try { 2848 doAtomicBatchOp(regionActionResultBuilder, region, quota, regionAction.getActionList(), 2849 cellScanner, nonceGroup, spaceQuotaEnforcement); 2850 regionActionResultBuilder.setProcessed(true); 2851 // We no longer use MultiResponse#processed. Instead, we use 2852 // RegionActionResult#processed. This is for backward compatibility for old clients. 2853 responseBuilder.setProcessed(true); 2854 } catch (IOException e) { 2855 rpcServer.getMetrics().exception(e); 2856 // As it's atomic, we may expect it's a global failure. 2857 regionActionResultBuilder.setException(ResponseConverter.buildException(e)); 2858 } 2859 } else { 2860 if ( 2861 rejectIfFromClient && regionAction.getActionCount() > 0 2862 && !isReplicationRequest(regionAction.getAction(0)) 2863 ) { 2864 // fail if it is not a replication request 2865 failRegionAction(responseBuilder, regionActionResultBuilder, regionAction, cellScanner, 2866 new DoNotRetryIOException( 2867 region.getRegionInfo().getRegionNameAsString() + " is in STANDBY state")); 2868 continue; 2869 } 2870 // doNonAtomicRegionMutation manages the exception internally 2871 if (context != null && closeCallBack == null) { 2872 // An RpcCallBack that creates a list of scanners that needs to perform callBack 2873 // operation on completion of multiGets. 2874 // Set this only once 2875 closeCallBack = new RegionScannersCloseCallBack(); 2876 context.setCallBack(closeCallBack); 2877 } 2878 cellsToReturn = doNonAtomicRegionMutation(region, quota, regionAction, cellScanner, 2879 regionActionResultBuilder, cellsToReturn, nonceGroup, closeCallBack, context, 2880 spaceQuotaEnforcement); 2881 } 2882 } finally { 2883 quota.close(); 2884 } 2885 2886 responseBuilder.addRegionActionResult(regionActionResultBuilder.build()); 2887 ClientProtos.RegionLoadStats regionLoadStats = region.getLoadStatistics(); 2888 if (regionLoadStats != null) { 2889 regionStats.put(regionSpecifier, regionLoadStats); 2890 } 2891 } 2892 // Load the controller with the Cells to return. 2893 if (cellsToReturn != null && !cellsToReturn.isEmpty() && controller != null) { 2894 controller.setCellScanner(PrivateCellUtil.createExtendedCellScanner(cellsToReturn)); 2895 } 2896 2897 MultiRegionLoadStats.Builder builder = MultiRegionLoadStats.newBuilder(); 2898 for (Entry<RegionSpecifier, ClientProtos.RegionLoadStats> stat : regionStats.entrySet()) { 2899 builder.addRegion(stat.getKey()); 2900 builder.addStat(stat.getValue()); 2901 } 2902 responseBuilder.setRegionStatistics(builder); 2903 return responseBuilder.build(); 2904 } 2905 2906 private void skipCellsForMutations(List<Action> actions, CellScanner cellScanner) { 2907 if (cellScanner == null) { 2908 return; 2909 } 2910 for (Action action : actions) { 2911 skipCellsForMutation(action, cellScanner); 2912 } 2913 } 2914 2915 private void skipCellsForMutation(Action action, CellScanner cellScanner) { 2916 if (cellScanner == null) { 2917 return; 2918 } 2919 try { 2920 if (action.hasMutation()) { 2921 MutationProto m = action.getMutation(); 2922 if (m.hasAssociatedCellCount()) { 2923 for (int i = 0; i < m.getAssociatedCellCount(); i++) { 2924 cellScanner.advance(); 2925 } 2926 } 2927 } 2928 } catch (IOException e) { 2929 // No need to handle these Individual Muatation level issue. Any way this entire RegionAction 2930 // marked as failed as we could not see the Region here. At client side the top level 2931 // RegionAction exception will be considered first. 2932 LOG.error("Error while skipping Cells in CellScanner for invalid Region Mutations", e); 2933 } 2934 } 2935 2936 /** 2937 * Mutate data in a table. 2938 * @param rpcc the RPC controller 2939 * @param request the mutate request 2940 */ 2941 @Override 2942 public MutateResponse mutate(final RpcController rpcc, final MutateRequest request) 2943 throws ServiceException { 2944 // rpc controller is how we bring in data via the back door; it is unprotobuf'ed data. 2945 // It is also the conduit via which we pass back data. 2946 HBaseRpcController controller = (HBaseRpcController) rpcc; 2947 CellScanner cellScanner = controller != null ? controller.cellScanner() : null; 2948 OperationQuota quota = null; 2949 RpcCallContext context = RpcServer.getCurrentCall().orElse(null); 2950 // Clear scanner so we are not holding on to reference across call. 2951 if (controller != null) { 2952 controller.setCellScanner(null); 2953 } 2954 try { 2955 checkOpen(); 2956 requestCount.increment(); 2957 rpcMutateRequestCount.increment(); 2958 HRegion region = getRegion(request.getRegion()); 2959 rejectIfInStandByState(region); 2960 MutateResponse.Builder builder = MutateResponse.newBuilder(); 2961 MutationProto mutation = request.getMutation(); 2962 if (!region.getRegionInfo().isMetaRegion()) { 2963 server.getMemStoreFlusher().reclaimMemStoreMemory(); 2964 } 2965 long nonceGroup = request.hasNonceGroup() ? request.getNonceGroup() : HConstants.NO_NONCE; 2966 OperationQuota.OperationType operationType = QuotaUtil.getQuotaOperationType(request); 2967 quota = getRpcQuotaManager().checkBatchQuota(region, operationType); 2968 ActivePolicyEnforcement spaceQuotaEnforcement = 2969 getSpaceQuotaManager().getActiveEnforcements(); 2970 2971 if (request.hasCondition()) { 2972 CheckAndMutateResult result = checkAndMutate(region, quota, mutation, cellScanner, 2973 request.getCondition(), nonceGroup, spaceQuotaEnforcement, context); 2974 builder.setProcessed(result.isSuccess()); 2975 boolean clientCellBlockSupported = isClientCellBlockSupport(context); 2976 addResult(builder, result.getResult(), controller, clientCellBlockSupported); 2977 if (clientCellBlockSupported) { 2978 addSize(context, result.getResult()); 2979 } 2980 if (result.getMetrics() != null) { 2981 builder.setMetrics(ProtobufUtil.toQueryMetrics(result.getMetrics())); 2982 } 2983 } else { 2984 Result r = null; 2985 Boolean processed = null; 2986 MutationType type = mutation.getMutateType(); 2987 switch (type) { 2988 case APPEND: 2989 // TODO: this doesn't actually check anything. 2990 r = append(region, quota, mutation, cellScanner, nonceGroup, spaceQuotaEnforcement, 2991 context); 2992 break; 2993 case INCREMENT: 2994 // TODO: this doesn't actually check anything. 2995 r = increment(region, quota, mutation, cellScanner, nonceGroup, spaceQuotaEnforcement, 2996 context); 2997 break; 2998 case PUT: 2999 put(region, quota, mutation, cellScanner, spaceQuotaEnforcement); 3000 processed = Boolean.TRUE; 3001 break; 3002 case DELETE: 3003 delete(region, quota, mutation, cellScanner, spaceQuotaEnforcement); 3004 processed = Boolean.TRUE; 3005 break; 3006 default: 3007 throw new DoNotRetryIOException("Unsupported mutate type: " + type.name()); 3008 } 3009 if (processed != null) { 3010 builder.setProcessed(processed); 3011 } 3012 boolean clientCellBlockSupported = isClientCellBlockSupport(context); 3013 addResult(builder, r, controller, clientCellBlockSupported); 3014 if (clientCellBlockSupported) { 3015 addSize(context, r); 3016 } 3017 } 3018 return builder.build(); 3019 } catch (IOException ie) { 3020 server.checkFileSystem(); 3021 throw new ServiceException(ie); 3022 } finally { 3023 if (quota != null) { 3024 quota.close(); 3025 } 3026 } 3027 } 3028 3029 private void put(HRegion region, OperationQuota quota, MutationProto mutation, 3030 CellScanner cellScanner, ActivePolicyEnforcement spaceQuota) throws IOException { 3031 long before = EnvironmentEdgeManager.currentTime(); 3032 Put put = ProtobufUtil.toPut(mutation, cellScanner); 3033 checkCellSizeLimit(region, put); 3034 spaceQuota.getPolicyEnforcement(region).check(put); 3035 quota.addMutation(put); 3036 region.put(put); 3037 3038 MetricsRegionServer metricsRegionServer = server.getMetrics(); 3039 if (metricsRegionServer != null) { 3040 long after = EnvironmentEdgeManager.currentTime(); 3041 metricsRegionServer.updatePut(region, after - before); 3042 } 3043 } 3044 3045 private void delete(HRegion region, OperationQuota quota, MutationProto mutation, 3046 CellScanner cellScanner, ActivePolicyEnforcement spaceQuota) throws IOException { 3047 long before = EnvironmentEdgeManager.currentTime(); 3048 Delete delete = ProtobufUtil.toDelete(mutation, cellScanner); 3049 checkCellSizeLimit(region, delete); 3050 spaceQuota.getPolicyEnforcement(region).check(delete); 3051 quota.addMutation(delete); 3052 region.delete(delete); 3053 3054 MetricsRegionServer metricsRegionServer = server.getMetrics(); 3055 if (metricsRegionServer != null) { 3056 long after = EnvironmentEdgeManager.currentTime(); 3057 metricsRegionServer.updateDelete(region, after - before); 3058 } 3059 } 3060 3061 private CheckAndMutateResult checkAndMutate(HRegion region, OperationQuota quota, 3062 MutationProto mutation, CellScanner cellScanner, Condition condition, long nonceGroup, 3063 ActivePolicyEnforcement spaceQuota, RpcCallContext context) throws IOException { 3064 long before = EnvironmentEdgeManager.currentTime(); 3065 long blockBytesScannedBefore = context != null ? context.getBlockBytesScanned() : 0; 3066 CheckAndMutate checkAndMutate = ProtobufUtil.toCheckAndMutate(condition, mutation, cellScanner); 3067 long nonce = mutation.hasNonce() ? mutation.getNonce() : HConstants.NO_NONCE; 3068 checkCellSizeLimit(region, (Mutation) checkAndMutate.getAction()); 3069 spaceQuota.getPolicyEnforcement(region).check((Mutation) checkAndMutate.getAction()); 3070 quota.addMutation((Mutation) checkAndMutate.getAction()); 3071 3072 CheckAndMutateResult result = null; 3073 if (region.getCoprocessorHost() != null) { 3074 result = region.getCoprocessorHost().preCheckAndMutate(checkAndMutate); 3075 } 3076 if (result == null) { 3077 result = region.checkAndMutate(checkAndMutate, nonceGroup, nonce); 3078 if (region.getCoprocessorHost() != null) { 3079 result = region.getCoprocessorHost().postCheckAndMutate(checkAndMutate, result); 3080 } 3081 } 3082 MetricsRegionServer metricsRegionServer = server.getMetrics(); 3083 if (metricsRegionServer != null) { 3084 long after = EnvironmentEdgeManager.currentTime(); 3085 long blockBytesScanned = 3086 context != null ? context.getBlockBytesScanned() - blockBytesScannedBefore : 0; 3087 metricsRegionServer.updateCheckAndMutate(region, after - before, blockBytesScanned); 3088 3089 MutationType type = mutation.getMutateType(); 3090 switch (type) { 3091 case PUT: 3092 metricsRegionServer.updateCheckAndPut(region, after - before); 3093 break; 3094 case DELETE: 3095 metricsRegionServer.updateCheckAndDelete(region, after - before); 3096 break; 3097 default: 3098 break; 3099 } 3100 } 3101 return result; 3102 } 3103 3104 // This is used to keep compatible with the old client implementation. Consider remove it if we 3105 // decide to drop the support of the client that still sends close request to a region scanner 3106 // which has already been exhausted. 3107 @Deprecated 3108 private static final IOException SCANNER_ALREADY_CLOSED = new IOException() { 3109 3110 private static final long serialVersionUID = -4305297078988180130L; 3111 3112 @Override 3113 public synchronized Throwable fillInStackTrace() { 3114 return this; 3115 } 3116 }; 3117 3118 private RegionScannerHolder getRegionScanner(ScanRequest request) throws IOException { 3119 String scannerName = toScannerName(request.getScannerId()); 3120 RegionScannerHolder rsh = this.scanners.get(scannerName); 3121 if (rsh == null) { 3122 // just ignore the next or close request if scanner does not exists. 3123 Long lastCallSeq = closedScanners.getIfPresent(scannerName); 3124 if (lastCallSeq != null) { 3125 // Check the sequence number to catch if the last call was incorrectly retried. 3126 // The only allowed scenario is when the scanner is exhausted and one more scan 3127 // request arrives - in this case returning 0 rows is correct. 3128 if (request.hasNextCallSeq() && request.getNextCallSeq() != lastCallSeq + 1) { 3129 throw new OutOfOrderScannerNextException("Expected nextCallSeq for closed request: " 3130 + (lastCallSeq + 1) + " But the nextCallSeq got from client: " 3131 + request.getNextCallSeq() + "; request=" + TextFormat.shortDebugString(request)); 3132 } else { 3133 throw SCANNER_ALREADY_CLOSED; 3134 } 3135 } else { 3136 LOG.warn("Client tried to access missing scanner " + scannerName); 3137 throw new UnknownScannerException( 3138 "Unknown scanner '" + scannerName + "'. This can happen due to any of the following " 3139 + "reasons: a) Scanner id given is wrong, b) Scanner lease expired because of " 3140 + "long wait between consecutive client checkins, c) Server may be closing down, " 3141 + "d) RegionServer restart during upgrade.\nIf the issue is due to reason (b), a " 3142 + "possible fix would be increasing the value of" 3143 + "'hbase.client.scanner.timeout.period' configuration."); 3144 } 3145 } 3146 rejectIfInStandByState(rsh.r); 3147 RegionInfo hri = rsh.s.getRegionInfo(); 3148 // Yes, should be the same instance 3149 if (server.getOnlineRegion(hri.getRegionName()) != rsh.r) { 3150 String msg = "Region has changed on the scanner " + scannerName + ": regionName=" 3151 + hri.getRegionNameAsString() + ", scannerRegionName=" + rsh.r; 3152 LOG.warn(msg + ", closing..."); 3153 scanners.remove(scannerName); 3154 try { 3155 rsh.s.close(); 3156 } catch (IOException e) { 3157 LOG.warn("Getting exception closing " + scannerName, e); 3158 } finally { 3159 try { 3160 server.getLeaseManager().cancelLease(scannerName); 3161 } catch (LeaseException e) { 3162 LOG.warn("Getting exception closing " + scannerName, e); 3163 } 3164 } 3165 throw new NotServingRegionException(msg); 3166 } 3167 return rsh; 3168 } 3169 3170 /** 3171 * @return Pair with scannerName key to use with this new Scanner and its RegionScannerHolder 3172 * value. 3173 */ 3174 private Pair<String, RegionScannerHolder> newRegionScanner(ScanRequest request, HRegion region, 3175 ScanResponse.Builder builder) throws IOException { 3176 rejectIfInStandByState(region); 3177 ClientProtos.Scan protoScan = request.getScan(); 3178 boolean isLoadingCfsOnDemandSet = protoScan.hasLoadColumnFamiliesOnDemand(); 3179 Scan scan = ProtobufUtil.toScan(protoScan); 3180 // if the request doesn't set this, get the default region setting. 3181 if (!isLoadingCfsOnDemandSet) { 3182 scan.setLoadColumnFamiliesOnDemand(region.isLoadingCfsOnDemandDefault()); 3183 } 3184 3185 if (!scan.hasFamilies()) { 3186 // Adding all families to scanner 3187 for (byte[] family : region.getTableDescriptor().getColumnFamilyNames()) { 3188 scan.addFamily(family); 3189 } 3190 } 3191 if (region.getCoprocessorHost() != null) { 3192 // preScannerOpen is not allowed to return a RegionScanner. Only post hook can create a 3193 // wrapper for the core created RegionScanner 3194 region.getCoprocessorHost().preScannerOpen(scan); 3195 } 3196 RegionScannerImpl coreScanner = region.getScanner(scan); 3197 Shipper shipper = coreScanner; 3198 RegionScanner scanner = coreScanner; 3199 try { 3200 if (region.getCoprocessorHost() != null) { 3201 scanner = region.getCoprocessorHost().postScannerOpen(scan, scanner); 3202 } 3203 } catch (Exception e) { 3204 // Although region coprocessor is for advanced users and they should take care of the 3205 // implementation to not damage the HBase system, closing the scanner on exception here does 3206 // not have any bad side effect, so let's do it 3207 scanner.close(); 3208 throw e; 3209 } 3210 long scannerId = scannerIdGenerator.generateNewScannerId(); 3211 builder.setScannerId(scannerId); 3212 builder.setMvccReadPoint(scanner.getMvccReadPoint()); 3213 builder.setTtl(scannerLeaseTimeoutPeriod); 3214 String scannerName = toScannerName(scannerId); 3215 3216 boolean fullRegionScan = 3217 !region.getRegionInfo().getTable().isSystemTable() && isFullRegionScan(scan, region); 3218 3219 return new Pair<String, RegionScannerHolder>(scannerName, 3220 addScanner(scannerName, scanner, shipper, region, scan.isNeedCursorResult(), fullRegionScan)); 3221 } 3222 3223 /** 3224 * The returned String is used as key doing look up of outstanding Scanners in this Servers' 3225 * this.scanners, the Map of outstanding scanners and their current state. 3226 * @param scannerId A scanner long id. 3227 * @return The long id as a String. 3228 */ 3229 private static String toScannerName(long scannerId) { 3230 return Long.toString(scannerId); 3231 } 3232 3233 private void checkScanNextCallSeq(ScanRequest request, RegionScannerHolder rsh) 3234 throws OutOfOrderScannerNextException { 3235 // if nextCallSeq does not match throw Exception straight away. This needs to be 3236 // performed even before checking of Lease. 3237 // See HBASE-5974 3238 if (request.hasNextCallSeq()) { 3239 long callSeq = request.getNextCallSeq(); 3240 if (!rsh.incNextCallSeq(callSeq)) { 3241 throw new OutOfOrderScannerNextException( 3242 "Expected nextCallSeq: " + rsh.getNextCallSeq() + " But the nextCallSeq got from client: " 3243 + request.getNextCallSeq() + "; request=" + TextFormat.shortDebugString(request) 3244 + "; region=" + rsh.r.getRegionInfo().getRegionNameAsString()); 3245 } 3246 } 3247 } 3248 3249 private void addScannerLeaseBack(LeaseManager.Lease lease) { 3250 try { 3251 server.getLeaseManager().addLease(lease); 3252 } catch (LeaseStillHeldException e) { 3253 // should not happen as the scanner id is unique. 3254 throw new AssertionError(e); 3255 } 3256 } 3257 3258 // visible for testing only 3259 long getTimeLimit(RpcCall rpcCall, HBaseRpcController controller, 3260 boolean allowHeartbeatMessages) { 3261 // Set the time limit to be half of the more restrictive timeout value (one of the 3262 // timeout values must be positive). In the event that both values are positive, the 3263 // more restrictive of the two is used to calculate the limit. 3264 if (allowHeartbeatMessages) { 3265 long now = EnvironmentEdgeManager.currentTime(); 3266 long remainingTimeout = getRemainingRpcTimeout(rpcCall, controller, now); 3267 if (scannerLeaseTimeoutPeriod > 0 || remainingTimeout > 0) { 3268 long timeLimitDelta; 3269 if (scannerLeaseTimeoutPeriod > 0 && remainingTimeout > 0) { 3270 timeLimitDelta = Math.min(scannerLeaseTimeoutPeriod, remainingTimeout); 3271 } else { 3272 timeLimitDelta = 3273 scannerLeaseTimeoutPeriod > 0 ? scannerLeaseTimeoutPeriod : remainingTimeout; 3274 } 3275 3276 // Use half of whichever timeout value was more restrictive... But don't allow 3277 // the time limit to be less than the allowable minimum (could cause an 3278 // immediate timeout before scanning any data). 3279 timeLimitDelta = Math.max(timeLimitDelta / 2, minimumScanTimeLimitDelta); 3280 return now + timeLimitDelta; 3281 } 3282 } 3283 // Default value of timeLimit is negative to indicate no timeLimit should be 3284 // enforced. 3285 return -1L; 3286 } 3287 3288 private long getRemainingRpcTimeout(RpcCall call, HBaseRpcController controller, long now) { 3289 long timeout; 3290 if (controller != null && controller.getCallTimeout() > 0) { 3291 timeout = controller.getCallTimeout(); 3292 } else if (rpcTimeout > 0) { 3293 timeout = rpcTimeout; 3294 } else { 3295 return -1; 3296 } 3297 if (call != null) { 3298 timeout -= (now - call.getReceiveTime()); 3299 } 3300 // getTimeLimit ignores values <= 0, but timeout may now be negative if queue time was high. 3301 // return minimum value here in that case so we count this in calculating the final delta. 3302 return Math.max(minimumScanTimeLimitDelta, timeout); 3303 } 3304 3305 private void checkLimitOfRows(int numOfCompleteRows, int limitOfRows, boolean moreRows, 3306 ScannerContext scannerContext, ScanResponse.Builder builder) { 3307 if (numOfCompleteRows >= limitOfRows) { 3308 if (LOG.isTraceEnabled()) { 3309 LOG.trace("Done scanning, limit of rows reached, moreRows: " + moreRows 3310 + " scannerContext: " + scannerContext); 3311 } 3312 builder.setMoreResults(false); 3313 } 3314 } 3315 3316 // return whether we have more results in region. 3317 private void scan(HBaseRpcController controller, ScanRequest request, RegionScannerHolder rsh, 3318 long maxQuotaResultSize, int maxResults, int limitOfRows, List<Result> results, 3319 ScanResponse.Builder builder, RpcCall rpcCall) throws IOException { 3320 HRegion region = rsh.r; 3321 RegionScanner scanner = rsh.s; 3322 long maxResultSize; 3323 if (scanner.getMaxResultSize() > 0) { 3324 maxResultSize = Math.min(scanner.getMaxResultSize(), maxQuotaResultSize); 3325 } else { 3326 maxResultSize = maxQuotaResultSize; 3327 } 3328 // This is cells inside a row. Default size is 10 so if many versions or many cfs, 3329 // then we'll resize. Resizings show in profiler. Set it higher than 10. For now 3330 // arbitrary 32. TODO: keep record of general size of results being returned. 3331 ArrayList<Cell> values = new ArrayList<>(32); 3332 region.startRegionOperation(Operation.SCAN); 3333 long before = EnvironmentEdgeManager.currentTime(); 3334 // Used to check if we've matched the row limit set on the Scan 3335 int numOfCompleteRows = 0; 3336 // Count of times we call nextRaw; can be > numOfCompleteRows. 3337 int numOfNextRawCalls = 0; 3338 try { 3339 int numOfResults = 0; 3340 synchronized (scanner) { 3341 boolean stale = (region.getRegionInfo().getReplicaId() != 0); 3342 boolean clientHandlesPartials = 3343 request.hasClientHandlesPartials() && request.getClientHandlesPartials(); 3344 boolean clientHandlesHeartbeats = 3345 request.hasClientHandlesHeartbeats() && request.getClientHandlesHeartbeats(); 3346 3347 // On the server side we must ensure that the correct ordering of partial results is 3348 // returned to the client to allow them to properly reconstruct the partial results. 3349 // If the coprocessor host is adding to the result list, we cannot guarantee the 3350 // correct ordering of partial results and so we prevent partial results from being 3351 // formed. 3352 boolean serverGuaranteesOrderOfPartials = results.isEmpty(); 3353 boolean allowPartialResults = clientHandlesPartials && serverGuaranteesOrderOfPartials; 3354 boolean moreRows = false; 3355 3356 // Heartbeat messages occur when the processing of the ScanRequest is exceeds a 3357 // certain time threshold on the server. When the time threshold is exceeded, the 3358 // server stops the scan and sends back whatever Results it has accumulated within 3359 // that time period (may be empty). Since heartbeat messages have the potential to 3360 // create partial Results (in the event that the timeout occurs in the middle of a 3361 // row), we must only generate heartbeat messages when the client can handle both 3362 // heartbeats AND partials 3363 boolean allowHeartbeatMessages = clientHandlesHeartbeats && allowPartialResults; 3364 3365 long timeLimit = getTimeLimit(rpcCall, controller, allowHeartbeatMessages); 3366 3367 final LimitScope sizeScope = 3368 allowPartialResults ? LimitScope.BETWEEN_CELLS : LimitScope.BETWEEN_ROWS; 3369 final LimitScope timeScope = 3370 allowHeartbeatMessages ? LimitScope.BETWEEN_CELLS : LimitScope.BETWEEN_ROWS; 3371 3372 boolean trackMetrics = request.hasTrackScanMetrics() && request.getTrackScanMetrics(); 3373 3374 // Configure with limits for this RPC. Set keep progress true since size progress 3375 // towards size limit should be kept between calls to nextRaw 3376 ScannerContext.Builder contextBuilder = ScannerContext.newBuilder(true); 3377 // maxResultSize - either we can reach this much size for all cells(being read) data or sum 3378 // of heap size occupied by cells(being read). Cell data means its key and value parts. 3379 // maxQuotaResultSize - max results just from server side configuration and quotas, without 3380 // user's specified max. We use this for evaluating limits based on blocks (not cells). 3381 // We may have accumulated some results in coprocessor preScannerNext call. Subtract any 3382 // cell or block size from maximum here so we adhere to total limits of request. 3383 // Note: we track block size in StoreScanner. If the CP hook got cells from hbase, it will 3384 // have accumulated block bytes. If not, this will be 0 for block size. 3385 long maxCellSize = maxResultSize; 3386 long maxBlockSize = maxQuotaResultSize; 3387 if (rpcCall != null) { 3388 maxBlockSize -= rpcCall.getBlockBytesScanned(); 3389 maxCellSize -= rpcCall.getResponseCellSize(); 3390 } 3391 3392 contextBuilder.setSizeLimit(sizeScope, maxCellSize, maxCellSize, maxBlockSize); 3393 contextBuilder.setBatchLimit(scanner.getBatch()); 3394 contextBuilder.setTimeLimit(timeScope, timeLimit); 3395 contextBuilder.setTrackMetrics(trackMetrics); 3396 ScannerContext scannerContext = contextBuilder.build(); 3397 boolean limitReached = false; 3398 long blockBytesScannedBefore = 0; 3399 while (numOfResults < maxResults) { 3400 // Reset the batch progress to 0 before every call to RegionScanner#nextRaw. The 3401 // batch limit is a limit on the number of cells per Result. Thus, if progress is 3402 // being tracked (i.e. scannerContext.keepProgress() is true) then we need to 3403 // reset the batch progress between nextRaw invocations since we don't want the 3404 // batch progress from previous calls to affect future calls 3405 scannerContext.setBatchProgress(0); 3406 assert values.isEmpty(); 3407 3408 // Collect values to be returned here 3409 moreRows = scanner.nextRaw(values, scannerContext); 3410 3411 long blockBytesScanned = scannerContext.getBlockSizeProgress() - blockBytesScannedBefore; 3412 blockBytesScannedBefore = scannerContext.getBlockSizeProgress(); 3413 3414 if (rpcCall == null) { 3415 // When there is no RpcCallContext,copy EC to heap, then the scanner would close, 3416 // This can be an EXPENSIVE call. It may make an extra copy from offheap to onheap 3417 // buffers.See more details in HBASE-26036. 3418 CellUtil.cloneIfNecessary(values); 3419 } 3420 numOfNextRawCalls++; 3421 3422 if (!values.isEmpty()) { 3423 if (limitOfRows > 0) { 3424 // First we need to check if the last result is partial and we have a row change. If 3425 // so then we need to increase the numOfCompleteRows. 3426 if (results.isEmpty()) { 3427 if ( 3428 rsh.rowOfLastPartialResult != null 3429 && !CellUtil.matchingRows(values.get(0), rsh.rowOfLastPartialResult) 3430 ) { 3431 numOfCompleteRows++; 3432 checkLimitOfRows(numOfCompleteRows, limitOfRows, moreRows, scannerContext, 3433 builder); 3434 } 3435 } else { 3436 Result lastResult = results.get(results.size() - 1); 3437 if ( 3438 lastResult.mayHaveMoreCellsInRow() 3439 && !CellUtil.matchingRows(values.get(0), lastResult.getRow()) 3440 ) { 3441 numOfCompleteRows++; 3442 checkLimitOfRows(numOfCompleteRows, limitOfRows, moreRows, scannerContext, 3443 builder); 3444 } 3445 } 3446 if (builder.hasMoreResults() && !builder.getMoreResults()) { 3447 break; 3448 } 3449 } 3450 boolean mayHaveMoreCellsInRow = scannerContext.mayHaveMoreCellsInRow(); 3451 Result r = Result.create(values, null, stale, mayHaveMoreCellsInRow); 3452 3453 if (request.getScan().getQueryMetricsEnabled()) { 3454 builder.addQueryMetrics(ClientProtos.QueryMetrics.newBuilder() 3455 .setBlockBytesScanned(blockBytesScanned).build()); 3456 } 3457 3458 results.add(r); 3459 numOfResults++; 3460 if (!mayHaveMoreCellsInRow && limitOfRows > 0) { 3461 numOfCompleteRows++; 3462 checkLimitOfRows(numOfCompleteRows, limitOfRows, moreRows, scannerContext, builder); 3463 if (builder.hasMoreResults() && !builder.getMoreResults()) { 3464 break; 3465 } 3466 } 3467 } else if (!moreRows && !results.isEmpty()) { 3468 // No more cells for the scan here, we need to ensure that the mayHaveMoreCellsInRow of 3469 // last result is false. Otherwise it's possible that: the first nextRaw returned 3470 // because BATCH_LIMIT_REACHED (BTW it happen to exhaust all cells of the scan),so the 3471 // last result's mayHaveMoreCellsInRow will be true. while the following nextRaw will 3472 // return with moreRows=false, which means moreResultsInRegion would be false, it will 3473 // be a contradictory state (HBASE-21206). 3474 int lastIdx = results.size() - 1; 3475 Result r = results.get(lastIdx); 3476 if (r.mayHaveMoreCellsInRow()) { 3477 results.set(lastIdx, ClientInternalHelper.createResult( 3478 ClientInternalHelper.getExtendedRawCells(r), r.getExists(), r.isStale(), false)); 3479 } 3480 } 3481 boolean sizeLimitReached = scannerContext.checkSizeLimit(LimitScope.BETWEEN_ROWS); 3482 boolean timeLimitReached = scannerContext.checkTimeLimit(LimitScope.BETWEEN_ROWS); 3483 boolean resultsLimitReached = numOfResults >= maxResults; 3484 limitReached = sizeLimitReached || timeLimitReached || resultsLimitReached; 3485 3486 if (limitReached || !moreRows) { 3487 // With block size limit, we may exceed size limit without collecting any results. 3488 // In this case we want to send heartbeat and/or cursor. We don't want to send heartbeat 3489 // or cursor if results were collected, for example for cell size or heap size limits. 3490 boolean sizeLimitReachedWithoutResults = sizeLimitReached && results.isEmpty(); 3491 // We only want to mark a ScanResponse as a heartbeat message in the event that 3492 // there are more values to be read server side. If there aren't more values, 3493 // marking it as a heartbeat is wasteful because the client will need to issue 3494 // another ScanRequest only to realize that they already have all the values 3495 if (moreRows && (timeLimitReached || sizeLimitReachedWithoutResults)) { 3496 // Heartbeat messages occur when the time limit has been reached, or size limit has 3497 // been reached before collecting any results. This can happen for heavily filtered 3498 // scans which scan over too many blocks. 3499 builder.setHeartbeatMessage(true); 3500 if (rsh.needCursor) { 3501 Cell cursorCell = scannerContext.getLastPeekedCell(); 3502 if (cursorCell != null) { 3503 builder.setCursor(ProtobufUtil.toCursor(cursorCell)); 3504 } 3505 } 3506 } 3507 break; 3508 } 3509 values.clear(); 3510 } 3511 if (rpcCall != null) { 3512 rpcCall.incrementResponseCellSize(scannerContext.getHeapSizeProgress()); 3513 } 3514 builder.setMoreResultsInRegion(moreRows); 3515 // Check to see if the client requested that we track metrics server side. If the 3516 // client requested metrics, retrieve the metrics from the scanner context. 3517 if (trackMetrics) { 3518 // rather than increment yet another counter in StoreScanner, just set the value here 3519 // from block size progress before writing into the response 3520 scannerContext.getMetrics().setCounter( 3521 ServerSideScanMetrics.BLOCK_BYTES_SCANNED_KEY_METRIC_NAME, 3522 scannerContext.getBlockSizeProgress()); 3523 if (rpcCall != null) { 3524 scannerContext.getMetrics().setCounter(ServerSideScanMetrics.FS_READ_TIME_METRIC_NAME, 3525 rpcCall.getFsReadTime()); 3526 } 3527 Map<String, Long> metrics = scannerContext.getMetrics().getMetricsMap(); 3528 ScanMetrics.Builder metricBuilder = ScanMetrics.newBuilder(); 3529 NameInt64Pair.Builder pairBuilder = NameInt64Pair.newBuilder(); 3530 3531 for (Entry<String, Long> entry : metrics.entrySet()) { 3532 pairBuilder.setName(entry.getKey()); 3533 pairBuilder.setValue(entry.getValue()); 3534 metricBuilder.addMetrics(pairBuilder.build()); 3535 } 3536 3537 builder.setScanMetrics(metricBuilder.build()); 3538 } 3539 } 3540 } finally { 3541 region.closeRegionOperation(); 3542 // Update serverside metrics, even on error. 3543 long end = EnvironmentEdgeManager.currentTime(); 3544 3545 long responseCellSize = 0; 3546 long blockBytesScanned = 0; 3547 if (rpcCall != null) { 3548 responseCellSize = rpcCall.getResponseCellSize(); 3549 blockBytesScanned = rpcCall.getBlockBytesScanned(); 3550 rsh.updateBlockBytesScanned(blockBytesScanned); 3551 } 3552 region.getMetrics().updateScan(); 3553 final MetricsRegionServer metricsRegionServer = server.getMetrics(); 3554 if (metricsRegionServer != null) { 3555 metricsRegionServer.updateScan(region, end - before, responseCellSize, blockBytesScanned); 3556 metricsRegionServer.updateReadQueryMeter(region, numOfNextRawCalls); 3557 } 3558 } 3559 // coprocessor postNext hook 3560 if (region.getCoprocessorHost() != null) { 3561 region.getCoprocessorHost().postScannerNext(scanner, results, maxResults, true); 3562 } 3563 } 3564 3565 /** 3566 * Scan data in a table. 3567 * @param controller the RPC controller 3568 * @param request the scan request 3569 */ 3570 @Override 3571 public ScanResponse scan(final RpcController controller, final ScanRequest request) 3572 throws ServiceException { 3573 if (controller != null && !(controller instanceof HBaseRpcController)) { 3574 throw new UnsupportedOperationException( 3575 "We only do " + "HBaseRpcControllers! FIX IF A PROBLEM: " + controller); 3576 } 3577 if (!request.hasScannerId() && !request.hasScan()) { 3578 throw new ServiceException( 3579 new DoNotRetryIOException("Missing required input: scannerId or scan")); 3580 } 3581 try { 3582 checkOpen(); 3583 } catch (IOException e) { 3584 if (request.hasScannerId()) { 3585 String scannerName = toScannerName(request.getScannerId()); 3586 if (LOG.isDebugEnabled()) { 3587 LOG.debug( 3588 "Server shutting down and client tried to access missing scanner " + scannerName); 3589 } 3590 final LeaseManager leaseManager = server.getLeaseManager(); 3591 if (leaseManager != null) { 3592 try { 3593 leaseManager.cancelLease(scannerName); 3594 } catch (LeaseException le) { 3595 // No problem, ignore 3596 if (LOG.isTraceEnabled()) { 3597 LOG.trace("Un-able to cancel lease of scanner. It could already be closed."); 3598 } 3599 } 3600 } 3601 } 3602 throw new ServiceException(e); 3603 } 3604 requestCount.increment(); 3605 rpcScanRequestCount.increment(); 3606 RegionScannerContext rsx; 3607 ScanResponse.Builder builder = ScanResponse.newBuilder(); 3608 try { 3609 rsx = checkQuotaAndGetRegionScannerContext(request, builder); 3610 } catch (IOException e) { 3611 if (e == SCANNER_ALREADY_CLOSED) { 3612 // Now we will close scanner automatically if there are no more results for this region but 3613 // the old client will still send a close request to us. Just ignore it and return. 3614 return builder.build(); 3615 } 3616 throw new ServiceException(e); 3617 } 3618 String scannerName = rsx.scannerName; 3619 RegionScannerHolder rsh = rsx.holder; 3620 OperationQuota quota = rsx.quota; 3621 if (rsh.fullRegionScan) { 3622 rpcFullScanRequestCount.increment(); 3623 } 3624 HRegion region = rsh.r; 3625 LeaseManager.Lease lease; 3626 try { 3627 // Remove lease while its being processed in server; protects against case 3628 // where processing of request takes > lease expiration time. or null if none found. 3629 lease = server.getLeaseManager().removeLease(scannerName); 3630 } catch (LeaseException e) { 3631 throw new ServiceException(e); 3632 } 3633 if (request.hasRenew() && request.getRenew()) { 3634 // add back and return 3635 addScannerLeaseBack(lease); 3636 try { 3637 checkScanNextCallSeq(request, rsh); 3638 } catch (OutOfOrderScannerNextException e) { 3639 throw new ServiceException(e); 3640 } 3641 return builder.build(); 3642 } 3643 try { 3644 checkScanNextCallSeq(request, rsh); 3645 } catch (OutOfOrderScannerNextException e) { 3646 addScannerLeaseBack(lease); 3647 throw new ServiceException(e); 3648 } 3649 // Now we have increased the next call sequence. If we give client an error, the retry will 3650 // never success. So we'd better close the scanner and return a DoNotRetryIOException to client 3651 // and then client will try to open a new scanner. 3652 boolean closeScanner = request.hasCloseScanner() ? request.getCloseScanner() : false; 3653 int rows; // this is scan.getCaching 3654 if (request.hasNumberOfRows()) { 3655 rows = request.getNumberOfRows(); 3656 } else { 3657 rows = closeScanner ? 0 : 1; 3658 } 3659 RpcCall rpcCall = RpcServer.getCurrentCall().orElse(null); 3660 // now let's do the real scan. 3661 long maxQuotaResultSize = Math.min(maxScannerResultSize, quota.getMaxResultSize()); 3662 RegionScanner scanner = rsh.s; 3663 // this is the limit of rows for this scan, if we the number of rows reach this value, we will 3664 // close the scanner. 3665 int limitOfRows; 3666 if (request.hasLimitOfRows()) { 3667 limitOfRows = request.getLimitOfRows(); 3668 } else { 3669 limitOfRows = -1; 3670 } 3671 boolean scannerClosed = false; 3672 try { 3673 List<Result> results = new ArrayList<>(Math.min(rows, 512)); 3674 if (rows > 0) { 3675 boolean done = false; 3676 // Call coprocessor. Get region info from scanner. 3677 if (region.getCoprocessorHost() != null) { 3678 Boolean bypass = region.getCoprocessorHost().preScannerNext(scanner, results, rows); 3679 if (!results.isEmpty()) { 3680 for (Result r : results) { 3681 // add cell size from CP results so we can track response size and update limits 3682 // when calling scan below if !done. We'll also have tracked block size if the CP 3683 // got results from hbase, since StoreScanner tracks that for all calls automatically. 3684 addSize(rpcCall, r); 3685 } 3686 } 3687 if (bypass != null && bypass.booleanValue()) { 3688 done = true; 3689 } 3690 } 3691 if (!done) { 3692 scan((HBaseRpcController) controller, request, rsh, maxQuotaResultSize, rows, limitOfRows, 3693 results, builder, rpcCall); 3694 } else { 3695 builder.setMoreResultsInRegion(!results.isEmpty()); 3696 } 3697 } else { 3698 // This is a open scanner call with numberOfRow = 0, so set more results in region to true. 3699 builder.setMoreResultsInRegion(true); 3700 } 3701 3702 quota.addScanResult(results); 3703 addResults(builder, results, (HBaseRpcController) controller, 3704 RegionReplicaUtil.isDefaultReplica(region.getRegionInfo()), 3705 isClientCellBlockSupport(rpcCall)); 3706 if (scanner.isFilterDone() && results.isEmpty()) { 3707 // If the scanner's filter - if any - is done with the scan 3708 // only set moreResults to false if the results is empty. This is used to keep compatible 3709 // with the old scan implementation where we just ignore the returned results if moreResults 3710 // is false. Can remove the isEmpty check after we get rid of the old implementation. 3711 builder.setMoreResults(false); 3712 } 3713 // Later we may close the scanner depending on this flag so here we need to make sure that we 3714 // have already set this flag. 3715 assert builder.hasMoreResultsInRegion(); 3716 // we only set moreResults to false in the above code, so set it to true if we haven't set it 3717 // yet. 3718 if (!builder.hasMoreResults()) { 3719 builder.setMoreResults(true); 3720 } 3721 if (builder.getMoreResults() && builder.getMoreResultsInRegion() && !results.isEmpty()) { 3722 // Record the last cell of the last result if it is a partial result 3723 // We need this to calculate the complete rows we have returned to client as the 3724 // mayHaveMoreCellsInRow is true does not mean that there will be extra cells for the 3725 // current row. We may filter out all the remaining cells for the current row and just 3726 // return the cells of the nextRow when calling RegionScanner.nextRaw. So here we need to 3727 // check for row change. 3728 Result lastResult = results.get(results.size() - 1); 3729 if (lastResult.mayHaveMoreCellsInRow()) { 3730 rsh.rowOfLastPartialResult = lastResult.getRow(); 3731 } else { 3732 rsh.rowOfLastPartialResult = null; 3733 } 3734 } 3735 if (!builder.getMoreResults() || !builder.getMoreResultsInRegion() || closeScanner) { 3736 scannerClosed = true; 3737 closeScanner(region, scanner, scannerName, rpcCall, false); 3738 } 3739 3740 // There's no point returning to a timed out client. Throwing ensures scanner is closed 3741 if (rpcCall != null && EnvironmentEdgeManager.currentTime() > rpcCall.getDeadline()) { 3742 throw new TimeoutIOException("Client deadline exceeded, cannot return results"); 3743 } 3744 3745 return builder.build(); 3746 } catch (IOException e) { 3747 try { 3748 // scanner is closed here 3749 scannerClosed = true; 3750 // The scanner state might be left in a dirty state, so we will tell the Client to 3751 // fail this RPC and close the scanner while opening up another one from the start of 3752 // row that the client has last seen. 3753 closeScanner(region, scanner, scannerName, rpcCall, true); 3754 3755 // If it is a DoNotRetryIOException already, throw as it is. Unfortunately, DNRIOE is 3756 // used in two different semantics. 3757 // (1) The first is to close the client scanner and bubble up the exception all the way 3758 // to the application. This is preferred when the exception is really un-recoverable 3759 // (like CorruptHFileException, etc). Plain DoNotRetryIOException also falls into this 3760 // bucket usually. 3761 // (2) Second semantics is to close the current region scanner only, but continue the 3762 // client scanner by overriding the exception. This is usually UnknownScannerException, 3763 // OutOfOrderScannerNextException, etc where the region scanner has to be closed, but the 3764 // application-level ClientScanner has to continue without bubbling up the exception to 3765 // the client. See ClientScanner code to see how it deals with these special exceptions. 3766 if (e instanceof DoNotRetryIOException) { 3767 throw e; 3768 } 3769 3770 // If it is a FileNotFoundException, wrap as a 3771 // DoNotRetryIOException. This can avoid the retry in ClientScanner. 3772 if (e instanceof FileNotFoundException) { 3773 throw new DoNotRetryIOException(e); 3774 } 3775 3776 // We closed the scanner already. Instead of throwing the IOException, and client 3777 // retrying with the same scannerId only to get USE on the next RPC, we directly throw 3778 // a special exception to save an RPC. 3779 if (VersionInfoUtil.hasMinimumVersion(rpcCall.getClientVersionInfo(), 1, 4)) { 3780 // 1.4.0+ clients know how to handle 3781 throw new ScannerResetException("Scanner is closed on the server-side", e); 3782 } else { 3783 // older clients do not know about SRE. Just throw USE, which they will handle 3784 throw new UnknownScannerException("Throwing UnknownScannerException to reset the client" 3785 + " scanner state for clients older than 1.3.", e); 3786 } 3787 } catch (IOException ioe) { 3788 throw new ServiceException(ioe); 3789 } 3790 } finally { 3791 if (!scannerClosed) { 3792 // Adding resets expiration time on lease. 3793 // the closeCallBack will be set in closeScanner so here we only care about shippedCallback 3794 if (rpcCall != null) { 3795 rpcCall.setCallBack(rsh.shippedCallback); 3796 } else { 3797 // If context is null,here we call rsh.shippedCallback directly to reuse the logic in 3798 // rsh.shippedCallback to release the internal resources in rsh,and lease is also added 3799 // back to regionserver's LeaseManager in rsh.shippedCallback. 3800 runShippedCallback(rsh); 3801 } 3802 } 3803 quota.close(); 3804 } 3805 } 3806 3807 private void runShippedCallback(RegionScannerHolder rsh) throws ServiceException { 3808 assert rsh.shippedCallback != null; 3809 try { 3810 rsh.shippedCallback.run(); 3811 } catch (IOException ioe) { 3812 throw new ServiceException(ioe); 3813 } 3814 } 3815 3816 private void closeScanner(HRegion region, RegionScanner scanner, String scannerName, 3817 RpcCallContext context, boolean isError) throws IOException { 3818 if (region.getCoprocessorHost() != null) { 3819 if (region.getCoprocessorHost().preScannerClose(scanner)) { 3820 // bypass the actual close. 3821 return; 3822 } 3823 } 3824 RegionScannerHolder rsh = scanners.remove(scannerName); 3825 if (rsh != null) { 3826 if (context != null) { 3827 context.setCallBack(rsh.closeCallBack); 3828 } else { 3829 rsh.s.close(); 3830 } 3831 if (region.getCoprocessorHost() != null) { 3832 region.getCoprocessorHost().postScannerClose(scanner); 3833 } 3834 if (!isError) { 3835 closedScanners.put(scannerName, rsh.getNextCallSeq()); 3836 } 3837 } 3838 } 3839 3840 @Override 3841 public CoprocessorServiceResponse execRegionServerService(RpcController controller, 3842 CoprocessorServiceRequest request) throws ServiceException { 3843 rpcPreCheck("execRegionServerService"); 3844 return server.execRegionServerService(controller, request); 3845 } 3846 3847 @Override 3848 public GetSpaceQuotaSnapshotsResponse getSpaceQuotaSnapshots(RpcController controller, 3849 GetSpaceQuotaSnapshotsRequest request) throws ServiceException { 3850 try { 3851 final RegionServerSpaceQuotaManager manager = server.getRegionServerSpaceQuotaManager(); 3852 final GetSpaceQuotaSnapshotsResponse.Builder builder = 3853 GetSpaceQuotaSnapshotsResponse.newBuilder(); 3854 if (manager != null) { 3855 final Map<TableName, SpaceQuotaSnapshot> snapshots = manager.copyQuotaSnapshots(); 3856 for (Entry<TableName, SpaceQuotaSnapshot> snapshot : snapshots.entrySet()) { 3857 builder.addSnapshots(TableQuotaSnapshot.newBuilder() 3858 .setTableName(ProtobufUtil.toProtoTableName(snapshot.getKey())) 3859 .setSnapshot(SpaceQuotaSnapshot.toProtoSnapshot(snapshot.getValue())).build()); 3860 } 3861 } 3862 return builder.build(); 3863 } catch (Exception e) { 3864 throw new ServiceException(e); 3865 } 3866 } 3867 3868 @Override 3869 public ClearRegionBlockCacheResponse clearRegionBlockCache(RpcController controller, 3870 ClearRegionBlockCacheRequest request) throws ServiceException { 3871 try { 3872 rpcPreCheck("clearRegionBlockCache"); 3873 ClearRegionBlockCacheResponse.Builder builder = ClearRegionBlockCacheResponse.newBuilder(); 3874 CacheEvictionStatsBuilder stats = CacheEvictionStats.builder(); 3875 server.getRegionServerCoprocessorHost().preClearRegionBlockCache(); 3876 List<HRegion> regions = getRegions(request.getRegionList(), stats); 3877 for (HRegion region : regions) { 3878 try { 3879 stats = stats.append(this.server.clearRegionBlockCache(region)); 3880 } catch (Exception e) { 3881 stats.addException(region.getRegionInfo().getRegionName(), e); 3882 } 3883 } 3884 stats.withMaxCacheSize(server.getBlockCache().map(BlockCache::getMaxSize).orElse(0L)); 3885 server.getRegionServerCoprocessorHost().postClearRegionBlockCache(stats.build()); 3886 return builder.setStats(ProtobufUtil.toCacheEvictionStats(stats.build())).build(); 3887 } catch (IOException e) { 3888 throw new ServiceException(e); 3889 } 3890 } 3891 3892 private void executeOpenRegionProcedures(OpenRegionRequest request, 3893 Map<TableName, TableDescriptor> tdCache) { 3894 long masterSystemTime = request.hasMasterSystemTime() ? request.getMasterSystemTime() : -1; 3895 long initiatingMasterActiveTime = 3896 request.hasInitiatingMasterActiveTime() ? request.getInitiatingMasterActiveTime() : -1; 3897 for (RegionOpenInfo regionOpenInfo : request.getOpenInfoList()) { 3898 RegionInfo regionInfo = ProtobufUtil.toRegionInfo(regionOpenInfo.getRegion()); 3899 TableName tableName = regionInfo.getTable(); 3900 TableDescriptor tableDesc = tdCache.get(tableName); 3901 if (tableDesc == null) { 3902 try { 3903 tableDesc = server.getTableDescriptors().get(regionInfo.getTable()); 3904 } catch (IOException e) { 3905 // Here we do not fail the whole method since we also need deal with other 3906 // procedures, and we can not ignore this one, so we still schedule a 3907 // AssignRegionHandler and it will report back to master if we still can not get the 3908 // TableDescriptor. 3909 LOG.warn("Failed to get TableDescriptor of {}, will try again in the handler", 3910 regionInfo.getTable(), e); 3911 } 3912 if (tableDesc != null) { 3913 tdCache.put(tableName, tableDesc); 3914 } 3915 } 3916 if (regionOpenInfo.getFavoredNodesCount() > 0) { 3917 server.updateRegionFavoredNodesMapping(regionInfo.getEncodedName(), 3918 regionOpenInfo.getFavoredNodesList()); 3919 } 3920 long procId = regionOpenInfo.getOpenProcId(); 3921 if (server.submitRegionProcedure(procId)) { 3922 server.getExecutorService().submit(AssignRegionHandler.create(server, regionInfo, procId, 3923 tableDesc, masterSystemTime, initiatingMasterActiveTime)); 3924 } 3925 } 3926 } 3927 3928 private void executeCloseRegionProcedures(CloseRegionRequest request) { 3929 String encodedName; 3930 long initiatingMasterActiveTime = 3931 request.hasInitiatingMasterActiveTime() ? request.getInitiatingMasterActiveTime() : -1; 3932 try { 3933 encodedName = ProtobufUtil.getRegionEncodedName(request.getRegion()); 3934 } catch (DoNotRetryIOException e) { 3935 throw new UncheckedIOException("Should not happen", e); 3936 } 3937 ServerName destination = request.hasDestinationServer() 3938 ? ProtobufUtil.toServerName(request.getDestinationServer()) 3939 : null; 3940 long procId = request.getCloseProcId(); 3941 boolean evictCache = request.getEvictCache(); 3942 if (server.submitRegionProcedure(procId)) { 3943 server.getExecutorService().submit(UnassignRegionHandler.create(server, encodedName, procId, 3944 false, destination, evictCache, initiatingMasterActiveTime)); 3945 } 3946 } 3947 3948 private void executeProcedures(RemoteProcedureRequest request) { 3949 RSProcedureCallable callable; 3950 try { 3951 callable = Class.forName(request.getProcClass()).asSubclass(RSProcedureCallable.class) 3952 .getDeclaredConstructor().newInstance(); 3953 } catch (Exception e) { 3954 LOG.warn("Failed to instantiating remote procedure {}, pid={}", request.getProcClass(), 3955 request.getProcId(), e); 3956 server.remoteProcedureComplete(request.getProcId(), request.getInitiatingMasterActiveTime(), 3957 e); 3958 return; 3959 } 3960 callable.init(request.getProcData().toByteArray(), server); 3961 LOG.debug("Executing remote procedure {}, pid={}", callable.getClass(), request.getProcId()); 3962 server.executeProcedure(request.getProcId(), request.getInitiatingMasterActiveTime(), callable); 3963 } 3964 3965 @Override 3966 @QosPriority(priority = HConstants.ADMIN_QOS) 3967 public ExecuteProceduresResponse executeProcedures(RpcController controller, 3968 ExecuteProceduresRequest request) throws ServiceException { 3969 try { 3970 checkOpen(); 3971 throwOnWrongStartCode(request); 3972 server.getRegionServerCoprocessorHost().preExecuteProcedures(); 3973 if (request.getOpenRegionCount() > 0) { 3974 // Avoid reading from the TableDescritor every time(usually it will read from the file 3975 // system) 3976 Map<TableName, TableDescriptor> tdCache = new HashMap<>(); 3977 request.getOpenRegionList().forEach(req -> executeOpenRegionProcedures(req, tdCache)); 3978 } 3979 if (request.getCloseRegionCount() > 0) { 3980 request.getCloseRegionList().forEach(this::executeCloseRegionProcedures); 3981 } 3982 if (request.getProcCount() > 0) { 3983 request.getProcList().forEach(this::executeProcedures); 3984 } 3985 server.getRegionServerCoprocessorHost().postExecuteProcedures(); 3986 return ExecuteProceduresResponse.getDefaultInstance(); 3987 } catch (IOException e) { 3988 throw new ServiceException(e); 3989 } 3990 } 3991 3992 @Override 3993 public GetAllBootstrapNodesResponse getAllBootstrapNodes(RpcController controller, 3994 GetAllBootstrapNodesRequest request) throws ServiceException { 3995 GetAllBootstrapNodesResponse.Builder builder = GetAllBootstrapNodesResponse.newBuilder(); 3996 server.getBootstrapNodes() 3997 .forEachRemaining(server -> builder.addNode(ProtobufUtil.toServerName(server))); 3998 return builder.build(); 3999 } 4000 4001 private void setReloadableGuardrails(Configuration conf) { 4002 rowSizeWarnThreshold = 4003 conf.getInt(HConstants.BATCH_ROWS_THRESHOLD_NAME, HConstants.BATCH_ROWS_THRESHOLD_DEFAULT); 4004 rejectRowsWithSizeOverThreshold = 4005 conf.getBoolean(REJECT_BATCH_ROWS_OVER_THRESHOLD, DEFAULT_REJECT_BATCH_ROWS_OVER_THRESHOLD); 4006 maxScannerResultSize = conf.getLong(HConstants.HBASE_SERVER_SCANNER_MAX_RESULT_SIZE_KEY, 4007 HConstants.DEFAULT_HBASE_SERVER_SCANNER_MAX_RESULT_SIZE); 4008 } 4009 4010 @Override 4011 public void onConfigurationChange(Configuration conf) { 4012 super.onConfigurationChange(conf); 4013 setReloadableGuardrails(conf); 4014 } 4015 4016 @Override 4017 public GetCachedFilesListResponse getCachedFilesList(RpcController controller, 4018 GetCachedFilesListRequest request) throws ServiceException { 4019 GetCachedFilesListResponse.Builder responseBuilder = GetCachedFilesListResponse.newBuilder(); 4020 List<String> fullyCachedFiles = new ArrayList<>(); 4021 server.getBlockCache().flatMap(BlockCache::getFullyCachedFiles).ifPresent(fcf -> { 4022 fullyCachedFiles.addAll(fcf.keySet()); 4023 }); 4024 return responseBuilder.addAllCachedFiles(fullyCachedFiles).build(); 4025 } 4026 4027 RegionScannerContext checkQuotaAndGetRegionScannerContext(ScanRequest request, 4028 ScanResponse.Builder builder) throws IOException { 4029 if (request.hasScannerId()) { 4030 // The downstream projects such as AsyncHBase in OpenTSDB need this value. See HBASE-18000 4031 // for more details. 4032 long scannerId = request.getScannerId(); 4033 builder.setScannerId(scannerId); 4034 String scannerName = toScannerName(scannerId); 4035 RegionScannerHolder rsh = getRegionScanner(request); 4036 OperationQuota quota = 4037 getRpcQuotaManager().checkScanQuota(rsh.r, request, maxScannerResultSize, 4038 rsh.getMaxBlockBytesScanned(), rsh.getPrevBlockBytesScannedDifference()); 4039 return new RegionScannerContext(scannerName, rsh, quota); 4040 } 4041 4042 HRegion region = getRegion(request.getRegion()); 4043 OperationQuota quota = 4044 getRpcQuotaManager().checkScanQuota(region, request, maxScannerResultSize, 0L, 0L); 4045 Pair<String, RegionScannerHolder> pair = newRegionScanner(request, region, builder); 4046 return new RegionScannerContext(pair.getFirst(), pair.getSecond(), quota); 4047 } 4048}