001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019 020import com.google.errorprone.annotations.RestrictedApi; 021import java.io.FileNotFoundException; 022import java.io.IOException; 023import java.io.UncheckedIOException; 024import java.net.BindException; 025import java.net.InetAddress; 026import java.net.InetSocketAddress; 027import java.util.ArrayList; 028import java.util.Arrays; 029import java.util.Collections; 030import java.util.HashMap; 031import java.util.Iterator; 032import java.util.List; 033import java.util.Map; 034import java.util.Map.Entry; 035import java.util.NavigableMap; 036import java.util.Set; 037import java.util.TreeSet; 038import java.util.concurrent.ConcurrentHashMap; 039import java.util.concurrent.ConcurrentMap; 040import java.util.concurrent.TimeUnit; 041import java.util.concurrent.atomic.AtomicBoolean; 042import java.util.concurrent.atomic.AtomicLong; 043import java.util.concurrent.atomic.LongAdder; 044import org.apache.hadoop.conf.Configuration; 045import org.apache.hadoop.fs.FileSystem; 046import org.apache.hadoop.fs.Path; 047import org.apache.hadoop.hbase.CacheEvictionStats; 048import org.apache.hadoop.hbase.CacheEvictionStatsBuilder; 049import org.apache.hadoop.hbase.Cell; 050import org.apache.hadoop.hbase.CellScanner; 051import org.apache.hadoop.hbase.CellUtil; 052import org.apache.hadoop.hbase.DoNotRetryIOException; 053import org.apache.hadoop.hbase.DroppedSnapshotException; 054import org.apache.hadoop.hbase.ExtendedCellScannable; 055import org.apache.hadoop.hbase.ExtendedCellScanner; 056import org.apache.hadoop.hbase.HBaseIOException; 057import org.apache.hadoop.hbase.HBaseRpcServicesBase; 058import org.apache.hadoop.hbase.HConstants; 059import org.apache.hadoop.hbase.MultiActionResultTooLarge; 060import org.apache.hadoop.hbase.NotServingRegionException; 061import org.apache.hadoop.hbase.PrivateCellUtil; 062import org.apache.hadoop.hbase.RegionTooBusyException; 063import org.apache.hadoop.hbase.Server; 064import org.apache.hadoop.hbase.ServerName; 065import org.apache.hadoop.hbase.TableName; 066import org.apache.hadoop.hbase.UnknownScannerException; 067import org.apache.hadoop.hbase.client.Append; 068import org.apache.hadoop.hbase.client.CheckAndMutate; 069import org.apache.hadoop.hbase.client.CheckAndMutateResult; 070import org.apache.hadoop.hbase.client.ClientInternalHelper; 071import org.apache.hadoop.hbase.client.Delete; 072import org.apache.hadoop.hbase.client.Durability; 073import org.apache.hadoop.hbase.client.Get; 074import org.apache.hadoop.hbase.client.Increment; 075import org.apache.hadoop.hbase.client.Mutation; 076import org.apache.hadoop.hbase.client.OperationWithAttributes; 077import org.apache.hadoop.hbase.client.Put; 078import org.apache.hadoop.hbase.client.QueryMetrics; 079import org.apache.hadoop.hbase.client.RegionInfo; 080import org.apache.hadoop.hbase.client.RegionReplicaUtil; 081import org.apache.hadoop.hbase.client.Result; 082import org.apache.hadoop.hbase.client.Row; 083import org.apache.hadoop.hbase.client.Scan; 084import org.apache.hadoop.hbase.client.TableDescriptor; 085import org.apache.hadoop.hbase.client.VersionInfoUtil; 086import org.apache.hadoop.hbase.client.metrics.ServerSideScanMetrics; 087import org.apache.hadoop.hbase.exceptions.FailedSanityCheckException; 088import org.apache.hadoop.hbase.exceptions.OutOfOrderScannerNextException; 089import org.apache.hadoop.hbase.exceptions.ScannerResetException; 090import org.apache.hadoop.hbase.exceptions.TimeoutIOException; 091import org.apache.hadoop.hbase.exceptions.UnknownProtocolException; 092import org.apache.hadoop.hbase.io.ByteBuffAllocator; 093import org.apache.hadoop.hbase.io.hfile.BlockCache; 094import org.apache.hadoop.hbase.ipc.HBaseRpcController; 095import org.apache.hadoop.hbase.ipc.PriorityFunction; 096import org.apache.hadoop.hbase.ipc.QosPriority; 097import org.apache.hadoop.hbase.ipc.RpcCall; 098import org.apache.hadoop.hbase.ipc.RpcCallContext; 099import org.apache.hadoop.hbase.ipc.RpcCallback; 100import org.apache.hadoop.hbase.ipc.RpcServer; 101import org.apache.hadoop.hbase.ipc.RpcServer.BlockingServiceAndInterface; 102import org.apache.hadoop.hbase.ipc.RpcServerFactory; 103import org.apache.hadoop.hbase.ipc.RpcServerInterface; 104import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException; 105import org.apache.hadoop.hbase.ipc.ServerRpcController; 106import org.apache.hadoop.hbase.monitoring.ThreadLocalServerSideScanMetrics; 107import org.apache.hadoop.hbase.net.Address; 108import org.apache.hadoop.hbase.procedure2.RSProcedureCallable; 109import org.apache.hadoop.hbase.quotas.ActivePolicyEnforcement; 110import org.apache.hadoop.hbase.quotas.OperationQuota; 111import org.apache.hadoop.hbase.quotas.QuotaUtil; 112import org.apache.hadoop.hbase.quotas.RegionServerRpcQuotaManager; 113import org.apache.hadoop.hbase.quotas.RegionServerSpaceQuotaManager; 114import org.apache.hadoop.hbase.quotas.SpaceQuotaSnapshot; 115import org.apache.hadoop.hbase.quotas.SpaceViolationPolicyEnforcement; 116import org.apache.hadoop.hbase.regionserver.LeaseManager.Lease; 117import org.apache.hadoop.hbase.regionserver.LeaseManager.LeaseStillHeldException; 118import org.apache.hadoop.hbase.regionserver.Region.Operation; 119import org.apache.hadoop.hbase.regionserver.ScannerContext.LimitScope; 120import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker; 121import org.apache.hadoop.hbase.regionserver.handler.AssignRegionHandler; 122import org.apache.hadoop.hbase.regionserver.handler.OpenMetaHandler; 123import org.apache.hadoop.hbase.regionserver.handler.OpenPriorityRegionHandler; 124import org.apache.hadoop.hbase.regionserver.handler.OpenRegionHandler; 125import org.apache.hadoop.hbase.regionserver.handler.UnassignRegionHandler; 126import org.apache.hadoop.hbase.replication.ReplicationUtils; 127import org.apache.hadoop.hbase.replication.regionserver.RejectReplicationRequestStateChecker; 128import org.apache.hadoop.hbase.replication.regionserver.RejectRequestsFromClientStateChecker; 129import org.apache.hadoop.hbase.security.Superusers; 130import org.apache.hadoop.hbase.security.access.Permission; 131import org.apache.hadoop.hbase.util.Bytes; 132import org.apache.hadoop.hbase.util.DNS; 133import org.apache.hadoop.hbase.util.DNS.ServerType; 134import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 135import org.apache.hadoop.hbase.util.Pair; 136import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil; 137import org.apache.hadoop.hbase.wal.WAL; 138import org.apache.hadoop.hbase.wal.WALEdit; 139import org.apache.hadoop.hbase.wal.WALKey; 140import org.apache.hadoop.hbase.wal.WALSplitUtil; 141import org.apache.hadoop.hbase.wal.WALSplitUtil.MutationReplay; 142import org.apache.hadoop.hbase.zookeeper.ZKWatcher; 143import org.apache.yetus.audience.InterfaceAudience; 144import org.slf4j.Logger; 145import org.slf4j.LoggerFactory; 146 147import org.apache.hbase.thirdparty.com.google.common.cache.Cache; 148import org.apache.hbase.thirdparty.com.google.common.cache.CacheBuilder; 149import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList; 150import org.apache.hbase.thirdparty.com.google.common.collect.Lists; 151import org.apache.hbase.thirdparty.com.google.protobuf.ByteString; 152import org.apache.hbase.thirdparty.com.google.protobuf.Message; 153import org.apache.hbase.thirdparty.com.google.protobuf.RpcController; 154import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException; 155import org.apache.hbase.thirdparty.com.google.protobuf.TextFormat; 156import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations; 157import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils; 158 159import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 160import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter; 161import org.apache.hadoop.hbase.shaded.protobuf.ResponseConverter; 162import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService; 163import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearCompactionQueuesRequest; 164import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearCompactionQueuesResponse; 165import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearRegionBlockCacheRequest; 166import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearRegionBlockCacheResponse; 167import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CloseRegionRequest; 168import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CloseRegionResponse; 169import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactRegionRequest; 170import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactRegionResponse; 171import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactionSwitchRequest; 172import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactionSwitchResponse; 173import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ExecuteProceduresRequest; 174import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ExecuteProceduresResponse; 175import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionRequest; 176import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionResponse; 177import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetCachedFilesListRequest; 178import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetCachedFilesListResponse; 179import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetOnlineRegionRequest; 180import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetOnlineRegionResponse; 181import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoRequest; 182import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoResponse; 183import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionLoadRequest; 184import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionLoadResponse; 185import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetServerInfoRequest; 186import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetServerInfoResponse; 187import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetStoreFileRequest; 188import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetStoreFileResponse; 189import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionRequest; 190import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionRequest.RegionOpenInfo; 191import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionResponse; 192import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionResponse.RegionOpeningState; 193import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.RemoteProcedureRequest; 194import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ReplicateWALEntryRequest; 195import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ReplicateWALEntryResponse; 196import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.RollWALWriterRequest; 197import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.RollWALWriterResponse; 198import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.StopServerRequest; 199import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.StopServerResponse; 200import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateFavoredNodesRequest; 201import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateFavoredNodesResponse; 202import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WALEntry; 203import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WarmupRegionRequest; 204import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WarmupRegionResponse; 205import org.apache.hadoop.hbase.shaded.protobuf.generated.BootstrapNodeProtos.BootstrapNodeService; 206import org.apache.hadoop.hbase.shaded.protobuf.generated.BootstrapNodeProtos.GetAllBootstrapNodesRequest; 207import org.apache.hadoop.hbase.shaded.protobuf.generated.BootstrapNodeProtos.GetAllBootstrapNodesResponse; 208import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos; 209import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.Action; 210import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.BulkLoadHFileRequest; 211import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.BulkLoadHFileRequest.FamilyPath; 212import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.BulkLoadHFileResponse; 213import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CleanupBulkLoadRequest; 214import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CleanupBulkLoadResponse; 215import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService; 216import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.Condition; 217import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceRequest; 218import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceResponse; 219import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.GetRequest; 220import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.GetResponse; 221import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MultiRegionLoadStats; 222import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MultiRequest; 223import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MultiResponse; 224import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateRequest; 225import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateResponse; 226import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutationProto; 227import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutationProto.MutationType; 228import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.PrepareBulkLoadRequest; 229import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.PrepareBulkLoadResponse; 230import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.RegionAction; 231import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.RegionActionResult; 232import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ResultOrException; 233import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanRequest; 234import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanResponse; 235import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos; 236import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos.RegionLoad; 237import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.BooleanMsg; 238import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.EmptyMsg; 239import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.ManagedKeyEntryRequest; 240import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.NameBytesPair; 241import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.NameInt64Pair; 242import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier; 243import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType; 244import org.apache.hadoop.hbase.shaded.protobuf.generated.MapReduceProtos.ScanMetrics; 245import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsRequest; 246import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsResponse; 247import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsResponse.TableQuotaSnapshot; 248import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.ClientMetaService; 249import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.BulkLoadDescriptor; 250import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.CompactionDescriptor; 251import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.FlushDescriptor; 252import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.RegionEventDescriptor; 253 254/** 255 * Implements the regionserver RPC services. 256 */ 257@InterfaceAudience.Private 258public class RSRpcServices extends HBaseRpcServicesBase<HRegionServer> 259 implements ClientService.BlockingInterface, BootstrapNodeService.BlockingInterface { 260 261 private static final Logger LOG = LoggerFactory.getLogger(RSRpcServices.class); 262 263 /** RPC scheduler to use for the region server. */ 264 public static final String REGION_SERVER_RPC_SCHEDULER_FACTORY_CLASS = 265 "hbase.region.server.rpc.scheduler.factory.class"; 266 267 /** 268 * Minimum allowable time limit delta (in milliseconds) that can be enforced during scans. This 269 * configuration exists to prevent the scenario where a time limit is specified to be so 270 * restrictive that the time limit is reached immediately (before any cells are scanned). 271 */ 272 private static final String REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA = 273 "hbase.region.server.rpc.minimum.scan.time.limit.delta"; 274 /** 275 * Default value of {@link RSRpcServices#REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA} 276 */ 277 static final long DEFAULT_REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA = 10; 278 279 /** 280 * Whether to reject rows with size > threshold defined by 281 * {@link HConstants#BATCH_ROWS_THRESHOLD_NAME} 282 */ 283 private static final String REJECT_BATCH_ROWS_OVER_THRESHOLD = 284 "hbase.rpc.rows.size.threshold.reject"; 285 286 /** 287 * Default value of config {@link RSRpcServices#REJECT_BATCH_ROWS_OVER_THRESHOLD} 288 */ 289 private static final boolean DEFAULT_REJECT_BATCH_ROWS_OVER_THRESHOLD = false; 290 291 // Request counter. (Includes requests that are not serviced by regions.) 292 // Count only once for requests with multiple actions like multi/caching-scan/replayBatch 293 final LongAdder requestCount = new LongAdder(); 294 295 // Request counter for rpc get 296 final LongAdder rpcGetRequestCount = new LongAdder(); 297 298 // Request counter for rpc scan 299 final LongAdder rpcScanRequestCount = new LongAdder(); 300 301 // Request counter for scans that might end up in full scans 302 final LongAdder rpcFullScanRequestCount = new LongAdder(); 303 304 // Request counter for rpc multi 305 final LongAdder rpcMultiRequestCount = new LongAdder(); 306 307 // Request counter for rpc mutate 308 final LongAdder rpcMutateRequestCount = new LongAdder(); 309 310 private volatile long maxScannerResultSize; 311 312 private ScannerIdGenerator scannerIdGenerator; 313 private final ConcurrentMap<String, RegionScannerHolder> scanners = new ConcurrentHashMap<>(); 314 // Hold the name and last sequence number of a closed scanner for a while. This is used 315 // to keep compatible for old clients which may send next or close request to a region 316 // scanner which has already been exhausted. The entries will be removed automatically 317 // after scannerLeaseTimeoutPeriod. 318 private final Cache<String, Long> closedScanners; 319 /** 320 * The lease timeout period for client scanners (milliseconds). 321 */ 322 private final int scannerLeaseTimeoutPeriod; 323 324 /** 325 * The RPC timeout period (milliseconds) 326 */ 327 private final int rpcTimeout; 328 329 /** 330 * The minimum allowable delta to use for the scan limit 331 */ 332 private final long minimumScanTimeLimitDelta; 333 334 /** 335 * Row size threshold for multi requests above which a warning is logged 336 */ 337 private volatile int rowSizeWarnThreshold; 338 /* 339 * Whether we should reject requests with very high no of rows i.e. beyond threshold defined by 340 * rowSizeWarnThreshold 341 */ 342 private volatile boolean rejectRowsWithSizeOverThreshold; 343 344 final AtomicBoolean clearCompactionQueues = new AtomicBoolean(false); 345 346 /** 347 * Services launched in RSRpcServices. By default they are on but you can use the below booleans 348 * to selectively enable/disable these services (Rare is the case where you would ever turn off 349 * one or the other). 350 */ 351 public static final String REGIONSERVER_ADMIN_SERVICE_CONFIG = 352 "hbase.regionserver.admin.executorService"; 353 public static final String REGIONSERVER_CLIENT_SERVICE_CONFIG = 354 "hbase.regionserver.client.executorService"; 355 public static final String REGIONSERVER_CLIENT_META_SERVICE_CONFIG = 356 "hbase.regionserver.client.meta.executorService"; 357 public static final String REGIONSERVER_BOOTSTRAP_NODES_SERVICE_CONFIG = 358 "hbase.regionserver.bootstrap.nodes.executorService"; 359 360 /** 361 * An Rpc callback for closing a RegionScanner. 362 */ 363 private static final class RegionScannerCloseCallBack implements RpcCallback { 364 365 private final RegionScanner scanner; 366 367 public RegionScannerCloseCallBack(RegionScanner scanner) { 368 this.scanner = scanner; 369 } 370 371 @Override 372 public void run() throws IOException { 373 this.scanner.close(); 374 } 375 } 376 377 /** 378 * An Rpc callback for doing shipped() call on a RegionScanner. 379 */ 380 private class RegionScannerShippedCallBack implements RpcCallback { 381 private final String scannerName; 382 private final Shipper shipper; 383 private final Lease lease; 384 385 public RegionScannerShippedCallBack(String scannerName, Shipper shipper, Lease lease) { 386 this.scannerName = scannerName; 387 this.shipper = shipper; 388 this.lease = lease; 389 } 390 391 @Override 392 public void run() throws IOException { 393 this.shipper.shipped(); 394 // We're done. On way out re-add the above removed lease. The lease was temp removed for this 395 // Rpc call and we are at end of the call now. Time to add it back. 396 if (scanners.containsKey(scannerName)) { 397 if (lease != null) { 398 server.getLeaseManager().addLease(lease); 399 } 400 } 401 } 402 } 403 404 /** 405 * An RpcCallBack that creates a list of scanners that needs to perform callBack operation on 406 * completion of multiGets. 407 */ 408 static class RegionScannersCloseCallBack implements RpcCallback { 409 private final List<RegionScanner> scanners = new ArrayList<>(); 410 411 public void addScanner(RegionScanner scanner) { 412 this.scanners.add(scanner); 413 } 414 415 @Override 416 public void run() { 417 for (RegionScanner scanner : scanners) { 418 try { 419 scanner.close(); 420 } catch (IOException e) { 421 LOG.error("Exception while closing the scanner " + scanner, e); 422 } 423 } 424 } 425 } 426 427 static class RegionScannerContext { 428 final String scannerName; 429 final RegionScannerHolder holder; 430 final OperationQuota quota; 431 432 RegionScannerContext(String scannerName, RegionScannerHolder holder, OperationQuota quota) { 433 this.scannerName = scannerName; 434 this.holder = holder; 435 this.quota = quota; 436 } 437 } 438 439 /** 440 * Holder class which holds the RegionScanner, nextCallSeq and RpcCallbacks together. 441 */ 442 static final class RegionScannerHolder { 443 private final AtomicLong nextCallSeq = new AtomicLong(0); 444 private final RegionScanner s; 445 private final HRegion r; 446 private final RpcCallback closeCallBack; 447 private final RpcCallback shippedCallback; 448 private byte[] rowOfLastPartialResult; 449 private boolean needCursor; 450 private boolean fullRegionScan; 451 private final String clientIPAndPort; 452 private final String userName; 453 private volatile long maxBlockBytesScanned = 0; 454 private volatile long prevBlockBytesScanned = 0; 455 private volatile long prevBlockBytesScannedDifference = 0; 456 457 RegionScannerHolder(RegionScanner s, HRegion r, RpcCallback closeCallBack, 458 RpcCallback shippedCallback, boolean needCursor, boolean fullRegionScan, 459 String clientIPAndPort, String userName) { 460 this.s = s; 461 this.r = r; 462 this.closeCallBack = closeCallBack; 463 this.shippedCallback = shippedCallback; 464 this.needCursor = needCursor; 465 this.fullRegionScan = fullRegionScan; 466 this.clientIPAndPort = clientIPAndPort; 467 this.userName = userName; 468 } 469 470 long getNextCallSeq() { 471 return nextCallSeq.get(); 472 } 473 474 boolean incNextCallSeq(long currentSeq) { 475 // Use CAS to prevent multiple scan request running on the same scanner. 476 return nextCallSeq.compareAndSet(currentSeq, currentSeq + 1); 477 } 478 479 long getMaxBlockBytesScanned() { 480 return maxBlockBytesScanned; 481 } 482 483 long getPrevBlockBytesScannedDifference() { 484 return prevBlockBytesScannedDifference; 485 } 486 487 void updateBlockBytesScanned(long blockBytesScanned) { 488 prevBlockBytesScannedDifference = blockBytesScanned - prevBlockBytesScanned; 489 prevBlockBytesScanned = blockBytesScanned; 490 if (blockBytesScanned > maxBlockBytesScanned) { 491 maxBlockBytesScanned = blockBytesScanned; 492 } 493 } 494 495 // Should be called only when we need to print lease expired messages otherwise 496 // cache the String once made. 497 @Override 498 public String toString() { 499 return "clientIPAndPort=" + this.clientIPAndPort + ", userName=" + this.userName 500 + ", regionInfo=" + this.r.getRegionInfo().getRegionNameAsString(); 501 } 502 } 503 504 /** 505 * Instantiated as a scanner lease. If the lease times out, the scanner is closed 506 */ 507 private class ScannerListener implements LeaseListener { 508 private final String scannerName; 509 510 ScannerListener(final String n) { 511 this.scannerName = n; 512 } 513 514 @Override 515 public void leaseExpired() { 516 RegionScannerHolder rsh = scanners.remove(this.scannerName); 517 if (rsh == null) { 518 LOG.warn("Scanner lease {} expired but no outstanding scanner", this.scannerName); 519 return; 520 } 521 LOG.info("Scanner lease {} expired {}", this.scannerName, rsh); 522 server.getMetrics().incrScannerLeaseExpired(); 523 RegionScanner s = rsh.s; 524 HRegion region = null; 525 try { 526 region = server.getRegion(s.getRegionInfo().getRegionName()); 527 if (region != null && region.getCoprocessorHost() != null) { 528 region.getCoprocessorHost().preScannerClose(s); 529 } 530 } catch (IOException e) { 531 LOG.error("Closing scanner {} {}", this.scannerName, rsh, e); 532 } finally { 533 try { 534 s.close(); 535 if (region != null && region.getCoprocessorHost() != null) { 536 region.getCoprocessorHost().postScannerClose(s); 537 } 538 } catch (IOException e) { 539 LOG.error("Closing scanner {} {}", this.scannerName, rsh, e); 540 } 541 } 542 } 543 } 544 545 private static ResultOrException getResultOrException(final ClientProtos.Result r, 546 final int index) { 547 return getResultOrException(ResponseConverter.buildActionResult(r), index); 548 } 549 550 private static ResultOrException getResultOrException(final Exception e, final int index) { 551 return getResultOrException(ResponseConverter.buildActionResult(e), index); 552 } 553 554 private static ResultOrException getResultOrException(final ResultOrException.Builder builder, 555 final int index) { 556 return builder.setIndex(index).build(); 557 } 558 559 /** 560 * Checks for the following pre-checks in order: 561 * <ol> 562 * <li>RegionServer is running</li> 563 * <li>If authorization is enabled, then RPC caller has ADMIN permissions</li> 564 * </ol> 565 * @param requestName name of rpc request. Used in reporting failures to provide context. 566 * @throws ServiceException If any of the above listed pre-check fails. 567 */ 568 private void rpcPreCheck(String requestName) throws ServiceException { 569 try { 570 checkOpen(); 571 requirePermission(requestName, Permission.Action.ADMIN); 572 } catch (IOException ioe) { 573 throw new ServiceException(ioe); 574 } 575 } 576 577 private boolean isClientCellBlockSupport(RpcCallContext context) { 578 return context != null && context.isClientCellBlockSupported(); 579 } 580 581 private void addResult(final MutateResponse.Builder builder, final Result result, 582 final HBaseRpcController rpcc, boolean clientCellBlockSupported) { 583 if (result == null) return; 584 if (clientCellBlockSupported) { 585 builder.setResult(ProtobufUtil.toResultNoData(result)); 586 rpcc.setCellScanner(result.cellScanner()); 587 } else { 588 ClientProtos.Result pbr = ProtobufUtil.toResult(result); 589 builder.setResult(pbr); 590 } 591 } 592 593 private void addResults(ScanResponse.Builder builder, List<Result> results, 594 HBaseRpcController controller, boolean isDefaultRegion, boolean clientCellBlockSupported) { 595 builder.setStale(!isDefaultRegion); 596 if (results.isEmpty()) { 597 return; 598 } 599 if (clientCellBlockSupported) { 600 for (Result res : results) { 601 builder.addCellsPerResult(res.size()); 602 builder.addPartialFlagPerResult(res.mayHaveMoreCellsInRow()); 603 } 604 controller.setCellScanner(PrivateCellUtil.createExtendedCellScanner(results)); 605 } else { 606 for (Result res : results) { 607 ClientProtos.Result pbr = ProtobufUtil.toResult(res); 608 builder.addResults(pbr); 609 } 610 } 611 } 612 613 private CheckAndMutateResult checkAndMutate(HRegion region, List<ClientProtos.Action> actions, 614 CellScanner cellScanner, Condition condition, long nonceGroup, 615 ActivePolicyEnforcement spaceQuotaEnforcement) throws IOException { 616 int countOfCompleteMutation = 0; 617 try { 618 if (!region.getRegionInfo().isMetaRegion()) { 619 server.getMemStoreFlusher().reclaimMemStoreMemory(); 620 } 621 List<Mutation> mutations = new ArrayList<>(); 622 long nonce = HConstants.NO_NONCE; 623 for (ClientProtos.Action action : actions) { 624 if (action.hasGet()) { 625 throw new DoNotRetryIOException( 626 "Atomic put and/or delete only, not a Get=" + action.getGet()); 627 } 628 MutationProto mutation = action.getMutation(); 629 MutationType type = mutation.getMutateType(); 630 switch (type) { 631 case PUT: 632 Put put = ProtobufUtil.toPut(mutation, cellScanner); 633 ++countOfCompleteMutation; 634 checkCellSizeLimit(region, put); 635 spaceQuotaEnforcement.getPolicyEnforcement(region).check(put); 636 mutations.add(put); 637 break; 638 case DELETE: 639 Delete del = ProtobufUtil.toDelete(mutation, cellScanner); 640 ++countOfCompleteMutation; 641 spaceQuotaEnforcement.getPolicyEnforcement(region).check(del); 642 mutations.add(del); 643 break; 644 case INCREMENT: 645 Increment increment = ProtobufUtil.toIncrement(mutation, cellScanner); 646 nonce = mutation.hasNonce() ? mutation.getNonce() : HConstants.NO_NONCE; 647 ++countOfCompleteMutation; 648 checkCellSizeLimit(region, increment); 649 spaceQuotaEnforcement.getPolicyEnforcement(region).check(increment); 650 mutations.add(increment); 651 break; 652 case APPEND: 653 Append append = ProtobufUtil.toAppend(mutation, cellScanner); 654 nonce = mutation.hasNonce() ? mutation.getNonce() : HConstants.NO_NONCE; 655 ++countOfCompleteMutation; 656 checkCellSizeLimit(region, append); 657 spaceQuotaEnforcement.getPolicyEnforcement(region).check(append); 658 mutations.add(append); 659 break; 660 default: 661 throw new DoNotRetryIOException("invalid mutation type : " + type); 662 } 663 } 664 665 if (mutations.size() == 0) { 666 return new CheckAndMutateResult(true, null); 667 } else { 668 CheckAndMutate checkAndMutate = ProtobufUtil.toCheckAndMutate(condition, mutations); 669 CheckAndMutateResult result = null; 670 if (region.getCoprocessorHost() != null) { 671 result = region.getCoprocessorHost().preCheckAndMutate(checkAndMutate); 672 } 673 if (result == null) { 674 result = region.checkAndMutate(checkAndMutate, nonceGroup, nonce); 675 if (region.getCoprocessorHost() != null) { 676 result = region.getCoprocessorHost().postCheckAndMutate(checkAndMutate, result); 677 } 678 } 679 return result; 680 } 681 } finally { 682 // Currently, the checkAndMutate isn't supported by batch so it won't mess up the cell scanner 683 // even if the malformed cells are not skipped. 684 for (int i = countOfCompleteMutation; i < actions.size(); ++i) { 685 skipCellsForMutation(actions.get(i), cellScanner); 686 } 687 } 688 } 689 690 /** 691 * Execute an append mutation. 692 * @return result to return to client if default operation should be bypassed as indicated by 693 * RegionObserver, null otherwise 694 */ 695 private Result append(final HRegion region, final OperationQuota quota, 696 final MutationProto mutation, final CellScanner cellScanner, long nonceGroup, 697 ActivePolicyEnforcement spaceQuota, RpcCallContext context) throws IOException { 698 long before = EnvironmentEdgeManager.currentTime(); 699 Append append = ProtobufUtil.toAppend(mutation, cellScanner); 700 checkCellSizeLimit(region, append); 701 spaceQuota.getPolicyEnforcement(region).check(append); 702 quota.addMutation(append); 703 long blockBytesScannedBefore = context != null ? context.getBlockBytesScanned() : 0; 704 long nonce = mutation.hasNonce() ? mutation.getNonce() : HConstants.NO_NONCE; 705 Result r = region.append(append, nonceGroup, nonce); 706 if (server.getMetrics() != null) { 707 long blockBytesScanned = 708 context != null ? context.getBlockBytesScanned() - blockBytesScannedBefore : 0; 709 server.getMetrics().updateAppend(region, EnvironmentEdgeManager.currentTime() - before, 710 blockBytesScanned); 711 } 712 return r == null ? Result.EMPTY_RESULT : r; 713 } 714 715 /** 716 * Execute an increment mutation. 717 */ 718 private Result increment(final HRegion region, final OperationQuota quota, 719 final MutationProto mutation, final CellScanner cells, long nonceGroup, 720 ActivePolicyEnforcement spaceQuota, RpcCallContext context) throws IOException { 721 long before = EnvironmentEdgeManager.currentTime(); 722 Increment increment = ProtobufUtil.toIncrement(mutation, cells); 723 checkCellSizeLimit(region, increment); 724 spaceQuota.getPolicyEnforcement(region).check(increment); 725 quota.addMutation(increment); 726 long blockBytesScannedBefore = context != null ? context.getBlockBytesScanned() : 0; 727 long nonce = mutation.hasNonce() ? mutation.getNonce() : HConstants.NO_NONCE; 728 Result r = region.increment(increment, nonceGroup, nonce); 729 final MetricsRegionServer metricsRegionServer = server.getMetrics(); 730 if (metricsRegionServer != null) { 731 long blockBytesScanned = 732 context != null ? context.getBlockBytesScanned() - blockBytesScannedBefore : 0; 733 metricsRegionServer.updateIncrement(region, EnvironmentEdgeManager.currentTime() - before, 734 blockBytesScanned); 735 } 736 return r == null ? Result.EMPTY_RESULT : r; 737 } 738 739 /** 740 * Run through the regionMutation <code>rm</code> and per Mutation, do the work, and then when 741 * done, add an instance of a {@link ResultOrException} that corresponds to each Mutation. 742 * @param cellsToReturn Could be null. May be allocated in this method. This is what this method 743 * returns as a 'result'. 744 * @param closeCallBack the callback to be used with multigets 745 * @param context the current RpcCallContext 746 * @return Return the <code>cellScanner</code> passed 747 */ 748 private List<ExtendedCellScannable> doNonAtomicRegionMutation(final HRegion region, 749 final OperationQuota quota, final RegionAction actions, final CellScanner cellScanner, 750 final RegionActionResult.Builder builder, List<ExtendedCellScannable> cellsToReturn, 751 long nonceGroup, final RegionScannersCloseCallBack closeCallBack, RpcCallContext context, 752 ActivePolicyEnforcement spaceQuotaEnforcement) { 753 // Gather up CONTIGUOUS Puts and Deletes in this mutations List. Idea is that rather than do 754 // one at a time, we instead pass them in batch. Be aware that the corresponding 755 // ResultOrException instance that matches each Put or Delete is then added down in the 756 // doNonAtomicBatchOp call. We should be staying aligned though the Put and Delete are 757 // deferred/batched 758 List<ClientProtos.Action> mutations = null; 759 long maxQuotaResultSize = Math.min(maxScannerResultSize, quota.getMaxResultSize()); 760 IOException sizeIOE = null; 761 ClientProtos.ResultOrException.Builder resultOrExceptionBuilder = 762 ResultOrException.newBuilder(); 763 boolean hasResultOrException = false; 764 for (ClientProtos.Action action : actions.getActionList()) { 765 hasResultOrException = false; 766 resultOrExceptionBuilder.clear(); 767 try { 768 Result r = null; 769 long blockBytesScannedBefore = context != null ? context.getBlockBytesScanned() : 0; 770 if ( 771 context != null && context.isRetryImmediatelySupported() 772 && (context.getResponseCellSize() > maxQuotaResultSize 773 || blockBytesScannedBefore + context.getResponseExceptionSize() > maxQuotaResultSize) 774 ) { 775 776 // We're storing the exception since the exception and reason string won't 777 // change after the response size limit is reached. 778 if (sizeIOE == null) { 779 // We don't need the stack un-winding do don't throw the exception. 780 // Throwing will kill the JVM's JIT. 781 // 782 // Instead just create the exception and then store it. 783 sizeIOE = new MultiActionResultTooLarge("Max size exceeded" + " CellSize: " 784 + context.getResponseCellSize() + " BlockSize: " + blockBytesScannedBefore); 785 786 // Only report the exception once since there's only one request that 787 // caused the exception. Otherwise this number will dominate the exceptions count. 788 rpcServer.getMetrics().exception(sizeIOE); 789 } 790 791 // Now that there's an exception is known to be created 792 // use it for the response. 793 // 794 // This will create a copy in the builder. 795 NameBytesPair pair = ResponseConverter.buildException(sizeIOE); 796 resultOrExceptionBuilder.setException(pair); 797 context.incrementResponseExceptionSize(pair.getSerializedSize()); 798 resultOrExceptionBuilder.setIndex(action.getIndex()); 799 builder.addResultOrException(resultOrExceptionBuilder.build()); 800 skipCellsForMutation(action, cellScanner); 801 continue; 802 } 803 if (action.hasGet()) { 804 long before = EnvironmentEdgeManager.currentTime(); 805 ClientProtos.Get pbGet = action.getGet(); 806 // An asynchbase client, https://github.com/OpenTSDB/asynchbase, starts by trying to do 807 // a get closest before. Throwing the UnknownProtocolException signals it that it needs 808 // to switch and do hbase2 protocol (HBase servers do not tell clients what versions 809 // they are; its a problem for non-native clients like asynchbase. HBASE-20225. 810 if (pbGet.hasClosestRowBefore() && pbGet.getClosestRowBefore()) { 811 throw new UnknownProtocolException("Is this a pre-hbase-1.0.0 or asynchbase client? " 812 + "Client is invoking getClosestRowBefore removed in hbase-2.0.0 replaced by " 813 + "reverse Scan."); 814 } 815 try { 816 Get get = ProtobufUtil.toGet(pbGet); 817 if (context != null) { 818 r = get(get, (region), closeCallBack, context); 819 } else { 820 r = region.get(get); 821 } 822 } finally { 823 final MetricsRegionServer metricsRegionServer = server.getMetrics(); 824 if (metricsRegionServer != null) { 825 long blockBytesScanned = 826 context != null ? context.getBlockBytesScanned() - blockBytesScannedBefore : 0; 827 metricsRegionServer.updateGet(region, EnvironmentEdgeManager.currentTime() - before, 828 blockBytesScanned); 829 } 830 } 831 } else if (action.hasServiceCall()) { 832 hasResultOrException = true; 833 Message result = execServiceOnRegion(region, action.getServiceCall()); 834 ClientProtos.CoprocessorServiceResult.Builder serviceResultBuilder = 835 ClientProtos.CoprocessorServiceResult.newBuilder(); 836 resultOrExceptionBuilder.setServiceResult(serviceResultBuilder 837 .setValue(serviceResultBuilder.getValueBuilder().setName(result.getClass().getName()) 838 // TODO: Copy!!! 839 .setValue(UnsafeByteOperations.unsafeWrap(result.toByteArray())))); 840 } else if (action.hasMutation()) { 841 MutationType type = action.getMutation().getMutateType(); 842 if ( 843 type != MutationType.PUT && type != MutationType.DELETE && mutations != null 844 && !mutations.isEmpty() 845 ) { 846 // Flush out any Puts or Deletes already collected. 847 doNonAtomicBatchOp(builder, region, quota, mutations, cellScanner, 848 spaceQuotaEnforcement); 849 mutations.clear(); 850 } 851 switch (type) { 852 case APPEND: 853 r = append(region, quota, action.getMutation(), cellScanner, nonceGroup, 854 spaceQuotaEnforcement, context); 855 break; 856 case INCREMENT: 857 r = increment(region, quota, action.getMutation(), cellScanner, nonceGroup, 858 spaceQuotaEnforcement, context); 859 break; 860 case PUT: 861 case DELETE: 862 // Collect the individual mutations and apply in a batch 863 if (mutations == null) { 864 mutations = new ArrayList<>(actions.getActionCount()); 865 } 866 mutations.add(action); 867 break; 868 default: 869 throw new DoNotRetryIOException("Unsupported mutate type: " + type.name()); 870 } 871 } else { 872 throw new HBaseIOException("Unexpected Action type"); 873 } 874 if (r != null) { 875 ClientProtos.Result pbResult = null; 876 if (isClientCellBlockSupport(context)) { 877 pbResult = ProtobufUtil.toResultNoData(r); 878 // Hard to guess the size here. Just make a rough guess. 879 if (cellsToReturn == null) { 880 cellsToReturn = new ArrayList<>(); 881 } 882 cellsToReturn.add(r); 883 } else { 884 pbResult = ProtobufUtil.toResult(r); 885 } 886 addSize(context, r); 887 hasResultOrException = true; 888 resultOrExceptionBuilder.setResult(pbResult); 889 } 890 // Could get to here and there was no result and no exception. Presumes we added 891 // a Put or Delete to the collecting Mutations List for adding later. In this 892 // case the corresponding ResultOrException instance for the Put or Delete will be added 893 // down in the doNonAtomicBatchOp method call rather than up here. 894 } catch (IOException ie) { 895 rpcServer.getMetrics().exception(ie); 896 hasResultOrException = true; 897 NameBytesPair pair = ResponseConverter.buildException(ie); 898 resultOrExceptionBuilder.setException(pair); 899 context.incrementResponseExceptionSize(pair.getSerializedSize()); 900 } 901 if (hasResultOrException) { 902 // Propagate index. 903 resultOrExceptionBuilder.setIndex(action.getIndex()); 904 builder.addResultOrException(resultOrExceptionBuilder.build()); 905 } 906 } 907 // Finish up any outstanding mutations 908 if (!CollectionUtils.isEmpty(mutations)) { 909 doNonAtomicBatchOp(builder, region, quota, mutations, cellScanner, spaceQuotaEnforcement); 910 } 911 return cellsToReturn; 912 } 913 914 private void checkCellSizeLimit(final HRegion r, final Mutation m) throws IOException { 915 if (r.maxCellSize > 0) { 916 CellScanner cells = m.cellScanner(); 917 while (cells.advance()) { 918 int size = PrivateCellUtil.estimatedSerializedSizeOf(cells.current()); 919 if (size > r.maxCellSize) { 920 String msg = "Cell[" + cells.current() + "] with size " + size + " exceeds limit of " 921 + r.maxCellSize + " bytes"; 922 LOG.debug(msg); 923 throw new DoNotRetryIOException(msg); 924 } 925 } 926 } 927 } 928 929 private void doAtomicBatchOp(final RegionActionResult.Builder builder, final HRegion region, 930 final OperationQuota quota, final List<ClientProtos.Action> mutations, final CellScanner cells, 931 long nonceGroup, ActivePolicyEnforcement spaceQuotaEnforcement) throws IOException { 932 // Just throw the exception. The exception will be caught and then added to region-level 933 // exception for RegionAction. Leaving the null to action result is ok since the null 934 // result is viewed as failure by hbase client. And the region-lever exception will be used 935 // to replaced the null result. see AsyncRequestFutureImpl#receiveMultiAction and 936 // AsyncBatchRpcRetryingCaller#onComplete for more details. 937 doBatchOp(builder, region, quota, mutations, cells, nonceGroup, spaceQuotaEnforcement, true); 938 } 939 940 private void doNonAtomicBatchOp(final RegionActionResult.Builder builder, final HRegion region, 941 final OperationQuota quota, final List<ClientProtos.Action> mutations, final CellScanner cells, 942 ActivePolicyEnforcement spaceQuotaEnforcement) { 943 try { 944 doBatchOp(builder, region, quota, mutations, cells, HConstants.NO_NONCE, 945 spaceQuotaEnforcement, false); 946 } catch (IOException e) { 947 // Set the exception for each action. The mutations in same RegionAction are group to 948 // different batch and then be processed individually. Hence, we don't set the region-level 949 // exception here for whole RegionAction. 950 for (Action mutation : mutations) { 951 builder.addResultOrException(getResultOrException(e, mutation.getIndex())); 952 } 953 } 954 } 955 956 /** 957 * Execute a list of mutations. 958 */ 959 private void doBatchOp(final RegionActionResult.Builder builder, final HRegion region, 960 final OperationQuota quota, final List<ClientProtos.Action> mutations, final CellScanner cells, 961 long nonceGroup, ActivePolicyEnforcement spaceQuotaEnforcement, boolean atomic) 962 throws IOException { 963 Mutation[] mArray = new Mutation[mutations.size()]; 964 long before = EnvironmentEdgeManager.currentTime(); 965 boolean batchContainsPuts = false, batchContainsDelete = false; 966 try { 967 /** 968 * HBASE-17924 mutationActionMap is a map to map the relation between mutations and actions 969 * since mutation array may have been reoredered.In order to return the right result or 970 * exception to the corresponding actions, We need to know which action is the mutation belong 971 * to. We can't sort ClientProtos.Action array, since they are bonded to cellscanners. 972 */ 973 Map<Mutation, ClientProtos.Action> mutationActionMap = new HashMap<>(); 974 int i = 0; 975 long nonce = HConstants.NO_NONCE; 976 for (ClientProtos.Action action : mutations) { 977 if (action.hasGet()) { 978 throw new DoNotRetryIOException( 979 "Atomic put and/or delete only, not a Get=" + action.getGet()); 980 } 981 MutationProto m = action.getMutation(); 982 Mutation mutation; 983 switch (m.getMutateType()) { 984 case PUT: 985 mutation = ProtobufUtil.toPut(m, cells); 986 batchContainsPuts = true; 987 break; 988 989 case DELETE: 990 mutation = ProtobufUtil.toDelete(m, cells); 991 batchContainsDelete = true; 992 break; 993 994 case INCREMENT: 995 mutation = ProtobufUtil.toIncrement(m, cells); 996 nonce = m.hasNonce() ? m.getNonce() : HConstants.NO_NONCE; 997 break; 998 999 case APPEND: 1000 mutation = ProtobufUtil.toAppend(m, cells); 1001 nonce = m.hasNonce() ? m.getNonce() : HConstants.NO_NONCE; 1002 break; 1003 1004 default: 1005 throw new DoNotRetryIOException("Invalid mutation type : " + m.getMutateType()); 1006 } 1007 mutationActionMap.put(mutation, action); 1008 mArray[i++] = mutation; 1009 checkCellSizeLimit(region, mutation); 1010 // Check if a space quota disallows this mutation 1011 spaceQuotaEnforcement.getPolicyEnforcement(region).check(mutation); 1012 quota.addMutation(mutation); 1013 } 1014 1015 if (!region.getRegionInfo().isMetaRegion()) { 1016 server.getMemStoreFlusher().reclaimMemStoreMemory(); 1017 } 1018 1019 // HBASE-17924 1020 // Sort to improve lock efficiency for non-atomic batch of operations. If atomic 1021 // order is preserved as its expected from the client 1022 if (!atomic) { 1023 Arrays.sort(mArray, (v1, v2) -> Row.COMPARATOR.compare(v1, v2)); 1024 } 1025 1026 OperationStatus[] codes = region.batchMutate(mArray, atomic, nonceGroup, nonce); 1027 1028 // When atomic is true, it indicates that the mutateRow API or the batch API with 1029 // RowMutations is called. In this case, we need to merge the results of the 1030 // Increment/Append operations if the mutations include those operations, and set the merged 1031 // result to the first element of the ResultOrException list 1032 if (atomic) { 1033 List<ResultOrException> resultOrExceptions = new ArrayList<>(); 1034 List<Result> results = new ArrayList<>(); 1035 for (i = 0; i < codes.length; i++) { 1036 if (codes[i].getResult() != null) { 1037 results.add(codes[i].getResult()); 1038 } 1039 if (i != 0) { 1040 resultOrExceptions 1041 .add(getResultOrException(ClientProtos.Result.getDefaultInstance(), i)); 1042 } 1043 } 1044 1045 if (results.isEmpty()) { 1046 builder.addResultOrException( 1047 getResultOrException(ClientProtos.Result.getDefaultInstance(), 0)); 1048 } else { 1049 // Merge the results of the Increment/Append operations 1050 List<Cell> cellList = new ArrayList<>(); 1051 for (Result result : results) { 1052 if (result.rawCells() != null) { 1053 cellList.addAll(Arrays.asList(result.rawCells())); 1054 } 1055 } 1056 Result result = Result.create(cellList); 1057 1058 // Set the merged result of the Increment/Append operations to the first element of the 1059 // ResultOrException list 1060 builder.addResultOrException(getResultOrException(ProtobufUtil.toResult(result), 0)); 1061 } 1062 1063 builder.addAllResultOrException(resultOrExceptions); 1064 return; 1065 } 1066 1067 for (i = 0; i < codes.length; i++) { 1068 Mutation currentMutation = mArray[i]; 1069 ClientProtos.Action currentAction = mutationActionMap.get(currentMutation); 1070 int index = currentAction.hasIndex() ? currentAction.getIndex() : i; 1071 Exception e; 1072 switch (codes[i].getOperationStatusCode()) { 1073 case BAD_FAMILY: 1074 e = new NoSuchColumnFamilyException(codes[i].getExceptionMsg()); 1075 builder.addResultOrException(getResultOrException(e, index)); 1076 break; 1077 1078 case SANITY_CHECK_FAILURE: 1079 e = new FailedSanityCheckException(codes[i].getExceptionMsg()); 1080 builder.addResultOrException(getResultOrException(e, index)); 1081 break; 1082 1083 default: 1084 e = new DoNotRetryIOException(codes[i].getExceptionMsg()); 1085 builder.addResultOrException(getResultOrException(e, index)); 1086 break; 1087 1088 case SUCCESS: 1089 ClientProtos.Result result = codes[i].getResult() == null 1090 ? ClientProtos.Result.getDefaultInstance() 1091 : ProtobufUtil.toResult(codes[i].getResult()); 1092 builder.addResultOrException(getResultOrException(result, index)); 1093 break; 1094 1095 case STORE_TOO_BUSY: 1096 e = new RegionTooBusyException(codes[i].getExceptionMsg()); 1097 builder.addResultOrException(getResultOrException(e, index)); 1098 break; 1099 } 1100 } 1101 } finally { 1102 int processedMutationIndex = 0; 1103 for (Action mutation : mutations) { 1104 // The non-null mArray[i] means the cell scanner has been read. 1105 if (mArray[processedMutationIndex++] == null) { 1106 skipCellsForMutation(mutation, cells); 1107 } 1108 } 1109 updateMutationMetrics(region, before, batchContainsPuts, batchContainsDelete); 1110 } 1111 } 1112 1113 private void updateMutationMetrics(HRegion region, long starttime, boolean batchContainsPuts, 1114 boolean batchContainsDelete) { 1115 final MetricsRegionServer metricsRegionServer = server.getMetrics(); 1116 if (metricsRegionServer != null) { 1117 long after = EnvironmentEdgeManager.currentTime(); 1118 if (batchContainsPuts) { 1119 metricsRegionServer.updatePutBatch(region, after - starttime); 1120 } 1121 if (batchContainsDelete) { 1122 metricsRegionServer.updateDeleteBatch(region, after - starttime); 1123 } 1124 } 1125 } 1126 1127 /** 1128 * Execute a list of Put/Delete mutations. The function returns OperationStatus instead of 1129 * constructing MultiResponse to save a possible loop if caller doesn't need MultiResponse. 1130 * @return an array of OperationStatus which internally contains the OperationStatusCode and the 1131 * exceptionMessage if any 1132 * @deprecated Since 3.0.0, will be removed in 4.0.0. We do not use this method for replaying 1133 * edits for secondary replicas any more, see 1134 * {@link #replicateToReplica(RpcController, ReplicateWALEntryRequest)}. 1135 */ 1136 @Deprecated 1137 private OperationStatus[] doReplayBatchOp(final HRegion region, 1138 final List<MutationReplay> mutations, long replaySeqId) throws IOException { 1139 long before = EnvironmentEdgeManager.currentTime(); 1140 boolean batchContainsPuts = false, batchContainsDelete = false; 1141 try { 1142 for (Iterator<MutationReplay> it = mutations.iterator(); it.hasNext();) { 1143 MutationReplay m = it.next(); 1144 1145 if (m.getType() == MutationType.PUT) { 1146 batchContainsPuts = true; 1147 } else { 1148 batchContainsDelete = true; 1149 } 1150 1151 NavigableMap<byte[], List<Cell>> map = m.mutation.getFamilyCellMap(); 1152 List<Cell> metaCells = map.get(WALEdit.METAFAMILY); 1153 if (metaCells != null && !metaCells.isEmpty()) { 1154 for (Cell metaCell : metaCells) { 1155 CompactionDescriptor compactionDesc = WALEdit.getCompaction(metaCell); 1156 boolean isDefaultReplica = RegionReplicaUtil.isDefaultReplica(region.getRegionInfo()); 1157 HRegion hRegion = region; 1158 if (compactionDesc != null) { 1159 // replay the compaction. Remove the files from stores only if we are the primary 1160 // region replica (thus own the files) 1161 hRegion.replayWALCompactionMarker(compactionDesc, !isDefaultReplica, isDefaultReplica, 1162 replaySeqId); 1163 continue; 1164 } 1165 FlushDescriptor flushDesc = WALEdit.getFlushDescriptor(metaCell); 1166 if (flushDesc != null && !isDefaultReplica) { 1167 hRegion.replayWALFlushMarker(flushDesc, replaySeqId); 1168 continue; 1169 } 1170 RegionEventDescriptor regionEvent = WALEdit.getRegionEventDescriptor(metaCell); 1171 if (regionEvent != null && !isDefaultReplica) { 1172 hRegion.replayWALRegionEventMarker(regionEvent); 1173 continue; 1174 } 1175 BulkLoadDescriptor bulkLoadEvent = WALEdit.getBulkLoadDescriptor(metaCell); 1176 if (bulkLoadEvent != null) { 1177 hRegion.replayWALBulkLoadEventMarker(bulkLoadEvent); 1178 continue; 1179 } 1180 } 1181 it.remove(); 1182 } 1183 } 1184 requestCount.increment(); 1185 if (!region.getRegionInfo().isMetaRegion()) { 1186 server.getMemStoreFlusher().reclaimMemStoreMemory(); 1187 } 1188 return region.batchReplay(mutations.toArray(new MutationReplay[mutations.size()]), 1189 replaySeqId); 1190 } finally { 1191 updateMutationMetrics(region, before, batchContainsPuts, batchContainsDelete); 1192 } 1193 } 1194 1195 private void closeAllScanners() { 1196 // Close any outstanding scanners. Means they'll get an UnknownScanner 1197 // exception next time they come in. 1198 for (Map.Entry<String, RegionScannerHolder> e : scanners.entrySet()) { 1199 try { 1200 e.getValue().s.close(); 1201 } catch (IOException ioe) { 1202 LOG.warn("Closing scanner " + e.getKey(), ioe); 1203 } 1204 } 1205 } 1206 1207 // Directly invoked only for testing 1208 public RSRpcServices(final HRegionServer rs) throws IOException { 1209 super(rs, rs.getProcessName()); 1210 final Configuration conf = rs.getConfiguration(); 1211 setReloadableGuardrails(conf); 1212 scannerLeaseTimeoutPeriod = conf.getInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, 1213 HConstants.DEFAULT_HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD); 1214 rpcTimeout = 1215 conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY, HConstants.DEFAULT_HBASE_RPC_TIMEOUT); 1216 minimumScanTimeLimitDelta = conf.getLong(REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA, 1217 DEFAULT_REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA); 1218 rpcServer.setNamedQueueRecorder(rs.getNamedQueueRecorder()); 1219 closedScanners = CacheBuilder.newBuilder() 1220 .expireAfterAccess(scannerLeaseTimeoutPeriod, TimeUnit.MILLISECONDS).build(); 1221 } 1222 1223 @Override 1224 protected boolean defaultReservoirEnabled() { 1225 return true; 1226 } 1227 1228 @Override 1229 protected ServerType getDNSServerType() { 1230 return DNS.ServerType.REGIONSERVER; 1231 } 1232 1233 @Override 1234 protected String getHostname(Configuration conf, String defaultHostname) { 1235 return conf.get("hbase.regionserver.ipc.address", defaultHostname); 1236 } 1237 1238 @Override 1239 protected String getPortConfigName() { 1240 return HConstants.REGIONSERVER_PORT; 1241 } 1242 1243 @Override 1244 protected int getDefaultPort() { 1245 return HConstants.DEFAULT_REGIONSERVER_PORT; 1246 } 1247 1248 @Override 1249 protected Class<?> getRpcSchedulerFactoryClass(Configuration conf) { 1250 return conf.getClass(REGION_SERVER_RPC_SCHEDULER_FACTORY_CLASS, 1251 SimpleRpcSchedulerFactory.class); 1252 } 1253 1254 protected RpcServerInterface createRpcServer(final Server server, 1255 final RpcSchedulerFactory rpcSchedulerFactory, final InetSocketAddress bindAddress, 1256 final String name) throws IOException { 1257 final Configuration conf = server.getConfiguration(); 1258 boolean reservoirEnabled = conf.getBoolean(ByteBuffAllocator.ALLOCATOR_POOL_ENABLED_KEY, true); 1259 try { 1260 return RpcServerFactory.createRpcServer(server, name, getServices(), bindAddress, // use final 1261 // bindAddress 1262 // for this 1263 // server. 1264 conf, rpcSchedulerFactory.create(conf, this, server), reservoirEnabled); 1265 } catch (BindException be) { 1266 throw new IOException(be.getMessage() + ". To switch ports use the '" 1267 + HConstants.REGIONSERVER_PORT + "' configuration property.", 1268 be.getCause() != null ? be.getCause() : be); 1269 } 1270 } 1271 1272 protected Class<?> getRpcSchedulerFactoryClass() { 1273 final Configuration conf = server.getConfiguration(); 1274 return conf.getClass(REGION_SERVER_RPC_SCHEDULER_FACTORY_CLASS, 1275 SimpleRpcSchedulerFactory.class); 1276 } 1277 1278 protected PriorityFunction createPriority() { 1279 return new RSAnnotationReadingPriorityFunction(this); 1280 } 1281 1282 public int getScannersCount() { 1283 return scanners.size(); 1284 } 1285 1286 /** Returns The outstanding RegionScanner for <code>scannerId</code> or null if none found. */ 1287 RegionScanner getScanner(long scannerId) { 1288 RegionScannerHolder rsh = checkQuotaAndGetRegionScannerContext(scannerId); 1289 return rsh == null ? null : rsh.s; 1290 } 1291 1292 /** Returns The associated RegionScannerHolder for <code>scannerId</code> or null. */ 1293 private RegionScannerHolder checkQuotaAndGetRegionScannerContext(long scannerId) { 1294 return scanners.get(toScannerName(scannerId)); 1295 } 1296 1297 public String getScanDetailsWithId(long scannerId) { 1298 RegionScanner scanner = getScanner(scannerId); 1299 if (scanner == null) { 1300 return null; 1301 } 1302 StringBuilder builder = new StringBuilder(); 1303 builder.append("table: ").append(scanner.getRegionInfo().getTable().getNameAsString()); 1304 builder.append(" region: ").append(scanner.getRegionInfo().getRegionNameAsString()); 1305 builder.append(" operation_id: ").append(scanner.getOperationId()); 1306 return builder.toString(); 1307 } 1308 1309 public String getScanDetailsWithRequest(ScanRequest request) { 1310 try { 1311 if (!request.hasRegion()) { 1312 return null; 1313 } 1314 Region region = getRegion(request.getRegion()); 1315 StringBuilder builder = new StringBuilder(); 1316 builder.append("table: ").append(region.getRegionInfo().getTable().getNameAsString()); 1317 builder.append(" region: ").append(region.getRegionInfo().getRegionNameAsString()); 1318 for (NameBytesPair pair : request.getScan().getAttributeList()) { 1319 if (OperationWithAttributes.ID_ATRIBUTE.equals(pair.getName())) { 1320 builder.append(" operation_id: ").append(Bytes.toString(pair.getValue().toByteArray())); 1321 break; 1322 } 1323 } 1324 return builder.toString(); 1325 } catch (IOException ignored) { 1326 return null; 1327 } 1328 } 1329 1330 /** 1331 * Get the vtime associated with the scanner. Currently the vtime is the number of "next" calls. 1332 */ 1333 long getScannerVirtualTime(long scannerId) { 1334 RegionScannerHolder rsh = checkQuotaAndGetRegionScannerContext(scannerId); 1335 return rsh == null ? 0L : rsh.getNextCallSeq(); 1336 } 1337 1338 /** 1339 * Method to account for the size of retained cells. 1340 * @param context rpc call context 1341 * @param r result to add size. 1342 * @return an object that represents the last referenced block from this response. 1343 */ 1344 void addSize(RpcCallContext context, Result r) { 1345 if (context != null && r != null && !r.isEmpty()) { 1346 for (Cell c : r.rawCells()) { 1347 context.incrementResponseCellSize(PrivateCellUtil.estimatedSerializedSizeOf(c)); 1348 } 1349 } 1350 } 1351 1352 /** Returns Remote client's ip and port else null if can't be determined. */ 1353 @RestrictedApi(explanation = "Should only be called in TestRSRpcServices and RSRpcServices", 1354 link = "", allowedOnPath = ".*(TestRSRpcServices|RSRpcServices).java") 1355 static String getRemoteClientIpAndPort() { 1356 RpcCall rpcCall = RpcServer.getCurrentCall().orElse(null); 1357 if (rpcCall == null) { 1358 return HConstants.EMPTY_STRING; 1359 } 1360 InetAddress address = rpcCall.getRemoteAddress(); 1361 if (address == null) { 1362 return HConstants.EMPTY_STRING; 1363 } 1364 // Be careful here with InetAddress. Do InetAddress#getHostAddress. It will not do a name 1365 // resolution. Just use the IP. It is generally a smaller amount of info to keep around while 1366 // scanning than a hostname anyways. 1367 return Address.fromParts(address.getHostAddress(), rpcCall.getRemotePort()).toString(); 1368 } 1369 1370 /** Returns Remote client's username. */ 1371 @RestrictedApi(explanation = "Should only be called in TestRSRpcServices and RSRpcServices", 1372 link = "", allowedOnPath = ".*(TestRSRpcServices|RSRpcServices).java") 1373 static String getUserName() { 1374 RpcCall rpcCall = RpcServer.getCurrentCall().orElse(null); 1375 if (rpcCall == null) { 1376 return HConstants.EMPTY_STRING; 1377 } 1378 return rpcCall.getRequestUserName().orElse(HConstants.EMPTY_STRING); 1379 } 1380 1381 private RegionScannerHolder addScanner(String scannerName, RegionScanner s, Shipper shipper, 1382 HRegion r, boolean needCursor, boolean fullRegionScan) throws LeaseStillHeldException { 1383 Lease lease = server.getLeaseManager().createLease(scannerName, this.scannerLeaseTimeoutPeriod, 1384 new ScannerListener(scannerName)); 1385 RpcCallback shippedCallback = new RegionScannerShippedCallBack(scannerName, shipper, lease); 1386 RpcCallback closeCallback = 1387 s instanceof RpcCallback ? (RpcCallback) s : new RegionScannerCloseCallBack(s); 1388 RegionScannerHolder rsh = new RegionScannerHolder(s, r, closeCallback, shippedCallback, 1389 needCursor, fullRegionScan, getRemoteClientIpAndPort(), getUserName()); 1390 RegionScannerHolder existing = scanners.putIfAbsent(scannerName, rsh); 1391 assert existing == null : "scannerId must be unique within regionserver's whole lifecycle! " 1392 + scannerName + ", " + existing; 1393 return rsh; 1394 } 1395 1396 private boolean isFullRegionScan(Scan scan, HRegion region) { 1397 // If the scan start row equals or less than the start key of the region 1398 // and stop row greater than equals end key (if stop row present) 1399 // or if the stop row is empty 1400 // account this as a full region scan 1401 if ( 1402 Bytes.compareTo(scan.getStartRow(), region.getRegionInfo().getStartKey()) <= 0 1403 && (Bytes.compareTo(scan.getStopRow(), region.getRegionInfo().getEndKey()) >= 0 1404 && !Bytes.equals(region.getRegionInfo().getEndKey(), HConstants.EMPTY_END_ROW) 1405 || Bytes.equals(scan.getStopRow(), HConstants.EMPTY_END_ROW)) 1406 ) { 1407 return true; 1408 } 1409 return false; 1410 } 1411 1412 /** 1413 * Find the HRegion based on a region specifier 1414 * @param regionSpecifier the region specifier 1415 * @return the corresponding region 1416 * @throws IOException if the specifier is not null, but failed to find the region 1417 */ 1418 public HRegion getRegion(final RegionSpecifier regionSpecifier) throws IOException { 1419 return server.getRegion(regionSpecifier.getValue().toByteArray()); 1420 } 1421 1422 /** 1423 * Find the List of HRegions based on a list of region specifiers 1424 * @param regionSpecifiers the list of region specifiers 1425 * @return the corresponding list of regions 1426 * @throws IOException if any of the specifiers is not null, but failed to find the region 1427 */ 1428 private List<HRegion> getRegions(final List<RegionSpecifier> regionSpecifiers, 1429 final CacheEvictionStatsBuilder stats) { 1430 List<HRegion> regions = Lists.newArrayListWithCapacity(regionSpecifiers.size()); 1431 for (RegionSpecifier regionSpecifier : regionSpecifiers) { 1432 try { 1433 regions.add(server.getRegion(regionSpecifier.getValue().toByteArray())); 1434 } catch (NotServingRegionException e) { 1435 stats.addException(regionSpecifier.getValue().toByteArray(), e); 1436 } 1437 } 1438 return regions; 1439 } 1440 1441 PriorityFunction getPriority() { 1442 return priority; 1443 } 1444 1445 private RegionServerRpcQuotaManager getRpcQuotaManager() { 1446 return server.getRegionServerRpcQuotaManager(); 1447 } 1448 1449 private RegionServerSpaceQuotaManager getSpaceQuotaManager() { 1450 return server.getRegionServerSpaceQuotaManager(); 1451 } 1452 1453 void start(ZKWatcher zkWatcher) { 1454 this.scannerIdGenerator = new ScannerIdGenerator(this.server.getServerName()); 1455 internalStart(zkWatcher); 1456 } 1457 1458 void stop() { 1459 closeAllScanners(); 1460 internalStop(); 1461 } 1462 1463 /** 1464 * Called to verify that this server is up and running. 1465 */ 1466 // TODO : Rename this and HMaster#checkInitialized to isRunning() (or a better name). 1467 protected void checkOpen() throws IOException { 1468 if (server.isAborted()) { 1469 throw new RegionServerAbortedException("Server " + server.getServerName() + " aborting"); 1470 } 1471 if (server.isStopped()) { 1472 throw new RegionServerStoppedException("Server " + server.getServerName() + " stopping"); 1473 } 1474 if (!server.isDataFileSystemOk()) { 1475 throw new RegionServerStoppedException("File system not available"); 1476 } 1477 if (!server.isOnline()) { 1478 throw new ServerNotRunningYetException( 1479 "Server " + server.getServerName() + " is not running yet"); 1480 } 1481 } 1482 1483 /** 1484 * By default, put up an Admin and a Client Service. Set booleans 1485 * <code>hbase.regionserver.admin.executorService</code> and 1486 * <code>hbase.regionserver.client.executorService</code> if you want to enable/disable services. 1487 * Default is that both are enabled. 1488 * @return immutable list of blocking services and the security info classes that this server 1489 * supports 1490 */ 1491 protected List<BlockingServiceAndInterface> getServices() { 1492 boolean admin = getConfiguration().getBoolean(REGIONSERVER_ADMIN_SERVICE_CONFIG, true); 1493 boolean client = getConfiguration().getBoolean(REGIONSERVER_CLIENT_SERVICE_CONFIG, true); 1494 boolean clientMeta = 1495 getConfiguration().getBoolean(REGIONSERVER_CLIENT_META_SERVICE_CONFIG, true); 1496 boolean bootstrapNodes = 1497 getConfiguration().getBoolean(REGIONSERVER_BOOTSTRAP_NODES_SERVICE_CONFIG, true); 1498 List<BlockingServiceAndInterface> bssi = new ArrayList<>(); 1499 if (client) { 1500 bssi.add(new BlockingServiceAndInterface(ClientService.newReflectiveBlockingService(this), 1501 ClientService.BlockingInterface.class)); 1502 } 1503 if (admin) { 1504 bssi.add(new BlockingServiceAndInterface(AdminService.newReflectiveBlockingService(this), 1505 AdminService.BlockingInterface.class)); 1506 } 1507 if (clientMeta) { 1508 bssi.add(new BlockingServiceAndInterface(ClientMetaService.newReflectiveBlockingService(this), 1509 ClientMetaService.BlockingInterface.class)); 1510 } 1511 if (bootstrapNodes) { 1512 bssi.add( 1513 new BlockingServiceAndInterface(BootstrapNodeService.newReflectiveBlockingService(this), 1514 BootstrapNodeService.BlockingInterface.class)); 1515 } 1516 return new ImmutableList.Builder<BlockingServiceAndInterface>().addAll(bssi).build(); 1517 } 1518 1519 /** 1520 * Close a region on the region server. 1521 * @param controller the RPC controller 1522 * @param request the request 1523 */ 1524 @Override 1525 @QosPriority(priority = HConstants.ADMIN_QOS) 1526 public CloseRegionResponse closeRegion(final RpcController controller, 1527 final CloseRegionRequest request) throws ServiceException { 1528 final ServerName sn = (request.hasDestinationServer() 1529 ? ProtobufUtil.toServerName(request.getDestinationServer()) 1530 : null); 1531 1532 try { 1533 checkOpen(); 1534 throwOnWrongStartCode(request); 1535 final String encodedRegionName = ProtobufUtil.getRegionEncodedName(request.getRegion()); 1536 1537 requestCount.increment(); 1538 if (sn == null) { 1539 LOG.info("Close " + encodedRegionName + " without moving"); 1540 } else { 1541 LOG.info("Close " + encodedRegionName + ", moving to " + sn); 1542 } 1543 boolean closed = server.closeRegion(encodedRegionName, false, sn); 1544 CloseRegionResponse.Builder builder = CloseRegionResponse.newBuilder().setClosed(closed); 1545 return builder.build(); 1546 } catch (IOException ie) { 1547 throw new ServiceException(ie); 1548 } 1549 } 1550 1551 /** 1552 * Compact a region on the region server. 1553 * @param controller the RPC controller 1554 * @param request the request 1555 */ 1556 @Override 1557 @QosPriority(priority = HConstants.ADMIN_QOS) 1558 public CompactRegionResponse compactRegion(final RpcController controller, 1559 final CompactRegionRequest request) throws ServiceException { 1560 try { 1561 checkOpen(); 1562 requestCount.increment(); 1563 HRegion region = getRegion(request.getRegion()); 1564 // Quota support is enabled, the requesting user is not system/super user 1565 // and a quota policy is enforced that disables compactions. 1566 if ( 1567 QuotaUtil.isQuotaEnabled(getConfiguration()) 1568 && !Superusers.isSuperUser(RpcServer.getRequestUser().orElse(null)) 1569 && this.server.getRegionServerSpaceQuotaManager() 1570 .areCompactionsDisabled(region.getTableDescriptor().getTableName()) 1571 ) { 1572 throw new DoNotRetryIOException( 1573 "Compactions on this region are " + "disabled due to a space quota violation."); 1574 } 1575 region.startRegionOperation(Operation.COMPACT_REGION); 1576 LOG.info("Compacting " + region.getRegionInfo().getRegionNameAsString()); 1577 boolean major = request.hasMajor() && request.getMajor(); 1578 if (request.hasFamily()) { 1579 byte[] family = request.getFamily().toByteArray(); 1580 String log = "User-triggered " + (major ? "major " : "") + "compaction for region " 1581 + region.getRegionInfo().getRegionNameAsString() + " and family " 1582 + Bytes.toString(family); 1583 LOG.trace(log); 1584 region.requestCompaction(family, log, Store.PRIORITY_USER, major, 1585 CompactionLifeCycleTracker.DUMMY); 1586 } else { 1587 String log = "User-triggered " + (major ? "major " : "") + "compaction for region " 1588 + region.getRegionInfo().getRegionNameAsString(); 1589 LOG.trace(log); 1590 region.requestCompaction(log, Store.PRIORITY_USER, major, CompactionLifeCycleTracker.DUMMY); 1591 } 1592 return CompactRegionResponse.newBuilder().build(); 1593 } catch (IOException ie) { 1594 throw new ServiceException(ie); 1595 } 1596 } 1597 1598 @Override 1599 @QosPriority(priority = HConstants.ADMIN_QOS) 1600 public CompactionSwitchResponse compactionSwitch(RpcController controller, 1601 CompactionSwitchRequest request) throws ServiceException { 1602 rpcPreCheck("compactionSwitch"); 1603 final CompactSplit compactSplitThread = server.getCompactSplitThread(); 1604 requestCount.increment(); 1605 boolean prevState = compactSplitThread.isCompactionsEnabled(); 1606 CompactionSwitchResponse response = 1607 CompactionSwitchResponse.newBuilder().setPrevState(prevState).build(); 1608 if (prevState == request.getEnabled()) { 1609 // passed in requested state is same as current state. No action required 1610 return response; 1611 } 1612 compactSplitThread.switchCompaction(request.getEnabled()); 1613 return response; 1614 } 1615 1616 /** 1617 * Flush a region on the region server. 1618 * @param controller the RPC controller 1619 * @param request the request 1620 */ 1621 @Override 1622 @QosPriority(priority = HConstants.ADMIN_QOS) 1623 public FlushRegionResponse flushRegion(final RpcController controller, 1624 final FlushRegionRequest request) throws ServiceException { 1625 try { 1626 checkOpen(); 1627 requestCount.increment(); 1628 HRegion region = getRegion(request.getRegion()); 1629 LOG.info("Flushing " + region.getRegionInfo().getRegionNameAsString()); 1630 boolean shouldFlush = true; 1631 if (request.hasIfOlderThanTs()) { 1632 shouldFlush = region.getEarliestFlushTimeForAllStores() < request.getIfOlderThanTs(); 1633 } 1634 FlushRegionResponse.Builder builder = FlushRegionResponse.newBuilder(); 1635 if (shouldFlush) { 1636 boolean writeFlushWalMarker = 1637 request.hasWriteFlushWalMarker() ? request.getWriteFlushWalMarker() : false; 1638 // Go behind the curtain so we can manage writing of the flush WAL marker 1639 HRegion.FlushResultImpl flushResult = null; 1640 if (request.hasFamily()) { 1641 List<byte[]> families = new ArrayList(); 1642 families.add(request.getFamily().toByteArray()); 1643 TableDescriptor tableDescriptor = region.getTableDescriptor(); 1644 List<String> noSuchFamilies = families.stream() 1645 .filter(f -> !tableDescriptor.hasColumnFamily(f)).map(Bytes::toString).toList(); 1646 if (!noSuchFamilies.isEmpty()) { 1647 throw new NoSuchColumnFamilyException("Column families " + noSuchFamilies 1648 + " don't exist in table " + tableDescriptor.getTableName().getNameAsString()); 1649 } 1650 flushResult = 1651 region.flushcache(families, writeFlushWalMarker, FlushLifeCycleTracker.DUMMY); 1652 } else { 1653 flushResult = region.flushcache(true, writeFlushWalMarker, FlushLifeCycleTracker.DUMMY); 1654 } 1655 boolean compactionNeeded = flushResult.isCompactionNeeded(); 1656 if (compactionNeeded) { 1657 server.getCompactSplitThread().requestSystemCompaction(region, 1658 "Compaction through user triggered flush"); 1659 } 1660 builder.setFlushed(flushResult.isFlushSucceeded()); 1661 builder.setWroteFlushWalMarker(flushResult.wroteFlushWalMarker); 1662 } 1663 builder.setLastFlushTime(region.getEarliestFlushTimeForAllStores()); 1664 return builder.build(); 1665 } catch (DroppedSnapshotException ex) { 1666 // Cache flush can fail in a few places. If it fails in a critical 1667 // section, we get a DroppedSnapshotException and a replay of wal 1668 // is required. Currently the only way to do this is a restart of 1669 // the server. 1670 server.abort("Replay of WAL required. Forcing server shutdown", ex); 1671 throw new ServiceException(ex); 1672 } catch (IOException ie) { 1673 throw new ServiceException(ie); 1674 } 1675 } 1676 1677 @Override 1678 @QosPriority(priority = HConstants.ADMIN_QOS) 1679 public GetOnlineRegionResponse getOnlineRegion(final RpcController controller, 1680 final GetOnlineRegionRequest request) throws ServiceException { 1681 try { 1682 checkOpen(); 1683 requestCount.increment(); 1684 Map<String, HRegion> onlineRegions = server.getOnlineRegions(); 1685 List<RegionInfo> list = new ArrayList<>(onlineRegions.size()); 1686 for (HRegion region : onlineRegions.values()) { 1687 list.add(region.getRegionInfo()); 1688 } 1689 list.sort(RegionInfo.COMPARATOR); 1690 return ResponseConverter.buildGetOnlineRegionResponse(list); 1691 } catch (IOException ie) { 1692 throw new ServiceException(ie); 1693 } 1694 } 1695 1696 // Master implementation of this Admin Service differs given it is not 1697 // able to supply detail only known to RegionServer. See note on 1698 // MasterRpcServers#getRegionInfo. 1699 @Override 1700 @QosPriority(priority = HConstants.ADMIN_QOS) 1701 public GetRegionInfoResponse getRegionInfo(final RpcController controller, 1702 final GetRegionInfoRequest request) throws ServiceException { 1703 try { 1704 checkOpen(); 1705 requestCount.increment(); 1706 HRegion region = getRegion(request.getRegion()); 1707 RegionInfo info = region.getRegionInfo(); 1708 byte[] bestSplitRow; 1709 if (request.hasBestSplitRow() && request.getBestSplitRow()) { 1710 bestSplitRow = region.checkSplit(true).orElse(null); 1711 // when all table data are in memstore, bestSplitRow = null 1712 // try to flush region first 1713 if (bestSplitRow == null) { 1714 region.flush(true); 1715 bestSplitRow = region.checkSplit(true).orElse(null); 1716 } 1717 } else { 1718 bestSplitRow = null; 1719 } 1720 GetRegionInfoResponse.Builder builder = GetRegionInfoResponse.newBuilder(); 1721 builder.setRegionInfo(ProtobufUtil.toRegionInfo(info)); 1722 if (request.hasCompactionState() && request.getCompactionState()) { 1723 builder.setCompactionState(ProtobufUtil.createCompactionState(region.getCompactionState())); 1724 } 1725 builder.setSplittable(region.isSplittable()); 1726 builder.setMergeable(region.isMergeable()); 1727 if (request.hasBestSplitRow() && request.getBestSplitRow() && bestSplitRow != null) { 1728 builder.setBestSplitRow(UnsafeByteOperations.unsafeWrap(bestSplitRow)); 1729 } 1730 return builder.build(); 1731 } catch (IOException ie) { 1732 throw new ServiceException(ie); 1733 } 1734 } 1735 1736 @Override 1737 @QosPriority(priority = HConstants.ADMIN_QOS) 1738 public GetRegionLoadResponse getRegionLoad(RpcController controller, GetRegionLoadRequest request) 1739 throws ServiceException { 1740 1741 List<HRegion> regions; 1742 if (request.hasTableName()) { 1743 TableName tableName = ProtobufUtil.toTableName(request.getTableName()); 1744 regions = server.getRegions(tableName); 1745 } else { 1746 regions = server.getRegions(); 1747 } 1748 List<RegionLoad> rLoads = new ArrayList<>(regions.size()); 1749 RegionLoad.Builder regionLoadBuilder = ClusterStatusProtos.RegionLoad.newBuilder(); 1750 RegionSpecifier.Builder regionSpecifier = RegionSpecifier.newBuilder(); 1751 1752 try { 1753 for (HRegion region : regions) { 1754 rLoads.add(server.createRegionLoad(region, regionLoadBuilder, regionSpecifier)); 1755 } 1756 } catch (IOException e) { 1757 throw new ServiceException(e); 1758 } 1759 GetRegionLoadResponse.Builder builder = GetRegionLoadResponse.newBuilder(); 1760 builder.addAllRegionLoads(rLoads); 1761 return builder.build(); 1762 } 1763 1764 @Override 1765 @QosPriority(priority = HConstants.ADMIN_QOS) 1766 public ClearCompactionQueuesResponse clearCompactionQueues(RpcController controller, 1767 ClearCompactionQueuesRequest request) throws ServiceException { 1768 LOG.debug("Client=" + RpcServer.getRequestUserName().orElse(null) + "/" 1769 + RpcServer.getRemoteAddress().orElse(null) + " clear compactions queue"); 1770 ClearCompactionQueuesResponse.Builder respBuilder = ClearCompactionQueuesResponse.newBuilder(); 1771 requestCount.increment(); 1772 if (clearCompactionQueues.compareAndSet(false, true)) { 1773 final CompactSplit compactSplitThread = server.getCompactSplitThread(); 1774 try { 1775 checkOpen(); 1776 server.getRegionServerCoprocessorHost().preClearCompactionQueues(); 1777 for (String queueName : request.getQueueNameList()) { 1778 LOG.debug("clear " + queueName + " compaction queue"); 1779 switch (queueName) { 1780 case "long": 1781 compactSplitThread.clearLongCompactionsQueue(); 1782 break; 1783 case "short": 1784 compactSplitThread.clearShortCompactionsQueue(); 1785 break; 1786 default: 1787 LOG.warn("Unknown queue name " + queueName); 1788 throw new IOException("Unknown queue name " + queueName); 1789 } 1790 } 1791 server.getRegionServerCoprocessorHost().postClearCompactionQueues(); 1792 } catch (IOException ie) { 1793 throw new ServiceException(ie); 1794 } finally { 1795 clearCompactionQueues.set(false); 1796 } 1797 } else { 1798 LOG.warn("Clear compactions queue is executing by other admin."); 1799 } 1800 return respBuilder.build(); 1801 } 1802 1803 /** 1804 * Get some information of the region server. 1805 * @param controller the RPC controller 1806 * @param request the request 1807 */ 1808 @Override 1809 @QosPriority(priority = HConstants.ADMIN_QOS) 1810 public GetServerInfoResponse getServerInfo(final RpcController controller, 1811 final GetServerInfoRequest request) throws ServiceException { 1812 try { 1813 checkOpen(); 1814 } catch (IOException ie) { 1815 throw new ServiceException(ie); 1816 } 1817 requestCount.increment(); 1818 int infoPort = server.getInfoServer() != null ? server.getInfoServer().getPort() : -1; 1819 return ResponseConverter.buildGetServerInfoResponse(server.getServerName(), infoPort); 1820 } 1821 1822 @Override 1823 @QosPriority(priority = HConstants.ADMIN_QOS) 1824 public GetStoreFileResponse getStoreFile(final RpcController controller, 1825 final GetStoreFileRequest request) throws ServiceException { 1826 try { 1827 checkOpen(); 1828 HRegion region = getRegion(request.getRegion()); 1829 requestCount.increment(); 1830 Set<byte[]> columnFamilies; 1831 if (request.getFamilyCount() == 0) { 1832 columnFamilies = region.getTableDescriptor().getColumnFamilyNames(); 1833 } else { 1834 columnFamilies = new TreeSet<>(Bytes.BYTES_RAWCOMPARATOR); 1835 for (ByteString cf : request.getFamilyList()) { 1836 columnFamilies.add(cf.toByteArray()); 1837 } 1838 } 1839 int nCF = columnFamilies.size(); 1840 List<String> fileList = region.getStoreFileList(columnFamilies.toArray(new byte[nCF][])); 1841 GetStoreFileResponse.Builder builder = GetStoreFileResponse.newBuilder(); 1842 builder.addAllStoreFile(fileList); 1843 return builder.build(); 1844 } catch (IOException ie) { 1845 throw new ServiceException(ie); 1846 } 1847 } 1848 1849 private void throwOnWrongStartCode(OpenRegionRequest request) throws ServiceException { 1850 if (!request.hasServerStartCode()) { 1851 LOG.warn("OpenRegionRequest for {} does not have a start code", request.getOpenInfoList()); 1852 return; 1853 } 1854 throwOnWrongStartCode(request.getServerStartCode()); 1855 } 1856 1857 private void throwOnWrongStartCode(CloseRegionRequest request) throws ServiceException { 1858 if (!request.hasServerStartCode()) { 1859 LOG.warn("CloseRegionRequest for {} does not have a start code", request.getRegion()); 1860 return; 1861 } 1862 throwOnWrongStartCode(request.getServerStartCode()); 1863 } 1864 1865 private void throwOnWrongStartCode(long serverStartCode) throws ServiceException { 1866 // check that we are the same server that this RPC is intended for. 1867 if (server.getServerName().getStartcode() != serverStartCode) { 1868 throw new ServiceException(new DoNotRetryIOException( 1869 "This RPC was intended for a " + "different server with startCode: " + serverStartCode 1870 + ", this server is: " + server.getServerName())); 1871 } 1872 } 1873 1874 private void throwOnWrongStartCode(ExecuteProceduresRequest req) throws ServiceException { 1875 if (req.getOpenRegionCount() > 0) { 1876 for (OpenRegionRequest openReq : req.getOpenRegionList()) { 1877 throwOnWrongStartCode(openReq); 1878 } 1879 } 1880 if (req.getCloseRegionCount() > 0) { 1881 for (CloseRegionRequest closeReq : req.getCloseRegionList()) { 1882 throwOnWrongStartCode(closeReq); 1883 } 1884 } 1885 } 1886 1887 /** 1888 * Open asynchronously a region or a set of regions on the region server. The opening is 1889 * coordinated by ZooKeeper, and this method requires the znode to be created before being called. 1890 * As a consequence, this method should be called only from the master. 1891 * <p> 1892 * Different manages states for the region are: 1893 * </p> 1894 * <ul> 1895 * <li>region not opened: the region opening will start asynchronously.</li> 1896 * <li>a close is already in progress: this is considered as an error.</li> 1897 * <li>an open is already in progress: this new open request will be ignored. This is important 1898 * because the Master can do multiple requests if it crashes.</li> 1899 * <li>the region is already opened: this new open request will be ignored.</li> 1900 * </ul> 1901 * <p> 1902 * Bulk assign: If there are more than 1 region to open, it will be considered as a bulk assign. 1903 * For a single region opening, errors are sent through a ServiceException. For bulk assign, 1904 * errors are put in the response as FAILED_OPENING. 1905 * </p> 1906 * @param controller the RPC controller 1907 * @param request the request 1908 */ 1909 @Override 1910 @QosPriority(priority = HConstants.ADMIN_QOS) 1911 public OpenRegionResponse openRegion(final RpcController controller, 1912 final OpenRegionRequest request) throws ServiceException { 1913 requestCount.increment(); 1914 throwOnWrongStartCode(request); 1915 1916 OpenRegionResponse.Builder builder = OpenRegionResponse.newBuilder(); 1917 final int regionCount = request.getOpenInfoCount(); 1918 final Map<TableName, TableDescriptor> htds = new HashMap<>(regionCount); 1919 final boolean isBulkAssign = regionCount > 1; 1920 try { 1921 checkOpen(); 1922 } catch (IOException ie) { 1923 TableName tableName = null; 1924 if (regionCount == 1) { 1925 org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionInfo ri = 1926 request.getOpenInfo(0).getRegion(); 1927 if (ri != null) { 1928 tableName = ProtobufUtil.toTableName(ri.getTableName()); 1929 } 1930 } 1931 if (!TableName.META_TABLE_NAME.equals(tableName)) { 1932 throw new ServiceException(ie); 1933 } 1934 // We are assigning meta, wait a little for regionserver to finish initialization. 1935 // Default to quarter of RPC timeout 1936 int timeout = server.getConfiguration().getInt(HConstants.HBASE_RPC_TIMEOUT_KEY, 1937 HConstants.DEFAULT_HBASE_RPC_TIMEOUT) >> 2; 1938 long endTime = EnvironmentEdgeManager.currentTime() + timeout; 1939 synchronized (server.online) { 1940 try { 1941 while ( 1942 EnvironmentEdgeManager.currentTime() <= endTime && !server.isStopped() 1943 && !server.isOnline() 1944 ) { 1945 server.online.wait(server.getMsgInterval()); 1946 } 1947 checkOpen(); 1948 } catch (InterruptedException t) { 1949 Thread.currentThread().interrupt(); 1950 throw new ServiceException(t); 1951 } catch (IOException e) { 1952 throw new ServiceException(e); 1953 } 1954 } 1955 } 1956 1957 long masterSystemTime = request.hasMasterSystemTime() ? request.getMasterSystemTime() : -1; 1958 1959 for (RegionOpenInfo regionOpenInfo : request.getOpenInfoList()) { 1960 final RegionInfo region = ProtobufUtil.toRegionInfo(regionOpenInfo.getRegion()); 1961 TableDescriptor htd; 1962 try { 1963 String encodedName = region.getEncodedName(); 1964 byte[] encodedNameBytes = region.getEncodedNameAsBytes(); 1965 final HRegion onlineRegion = server.getRegion(encodedName); 1966 if (onlineRegion != null) { 1967 // The region is already online. This should not happen any more. 1968 String error = "Received OPEN for the region:" + region.getRegionNameAsString() 1969 + ", which is already online"; 1970 LOG.warn(error); 1971 // server.abort(error); 1972 // throw new IOException(error); 1973 builder.addOpeningState(RegionOpeningState.OPENED); 1974 continue; 1975 } 1976 LOG.info("Open " + region.getRegionNameAsString()); 1977 1978 final Boolean previous = 1979 server.getRegionsInTransitionInRS().putIfAbsent(encodedNameBytes, Boolean.TRUE); 1980 1981 if (Boolean.FALSE.equals(previous)) { 1982 if (server.getRegion(encodedName) != null) { 1983 // There is a close in progress. This should not happen any more. 1984 String error = "Received OPEN for the region:" + region.getRegionNameAsString() 1985 + ", which we are already trying to CLOSE"; 1986 server.abort(error); 1987 throw new IOException(error); 1988 } 1989 server.getRegionsInTransitionInRS().put(encodedNameBytes, Boolean.TRUE); 1990 } 1991 1992 if (Boolean.TRUE.equals(previous)) { 1993 // An open is in progress. This is supported, but let's log this. 1994 LOG.info("Receiving OPEN for the region:" + region.getRegionNameAsString() 1995 + ", which we are already trying to OPEN" 1996 + " - ignoring this new request for this region."); 1997 } 1998 1999 // We are opening this region. If it moves back and forth for whatever reason, we don't 2000 // want to keep returning the stale moved record while we are opening/if we close again. 2001 server.removeFromMovedRegions(region.getEncodedName()); 2002 2003 if (previous == null || !previous.booleanValue()) { 2004 htd = htds.get(region.getTable()); 2005 if (htd == null) { 2006 htd = server.getTableDescriptors().get(region.getTable()); 2007 htds.put(region.getTable(), htd); 2008 } 2009 if (htd == null) { 2010 throw new IOException("Missing table descriptor for " + region.getEncodedName()); 2011 } 2012 // If there is no action in progress, we can submit a specific handler. 2013 // Need to pass the expected version in the constructor. 2014 if (server.getExecutorService() == null) { 2015 LOG.info("No executor executorService; skipping open request"); 2016 } else { 2017 if (region.isMetaRegion()) { 2018 server.getExecutorService() 2019 .submit(new OpenMetaHandler(server, server, region, htd, masterSystemTime)); 2020 } else { 2021 if (regionOpenInfo.getFavoredNodesCount() > 0) { 2022 server.updateRegionFavoredNodesMapping(region.getEncodedName(), 2023 regionOpenInfo.getFavoredNodesList()); 2024 } 2025 if (htd.getPriority() >= HConstants.ADMIN_QOS || region.getTable().isSystemTable()) { 2026 server.getExecutorService().submit( 2027 new OpenPriorityRegionHandler(server, server, region, htd, masterSystemTime)); 2028 } else { 2029 server.getExecutorService() 2030 .submit(new OpenRegionHandler(server, server, region, htd, masterSystemTime)); 2031 } 2032 } 2033 } 2034 } 2035 2036 builder.addOpeningState(RegionOpeningState.OPENED); 2037 } catch (IOException ie) { 2038 LOG.warn("Failed opening region " + region.getRegionNameAsString(), ie); 2039 if (isBulkAssign) { 2040 builder.addOpeningState(RegionOpeningState.FAILED_OPENING); 2041 } else { 2042 throw new ServiceException(ie); 2043 } 2044 } 2045 } 2046 return builder.build(); 2047 } 2048 2049 /** 2050 * Warmup a region on this server. This method should only be called by Master. It synchronously 2051 * opens the region and closes the region bringing the most important pages in cache. 2052 */ 2053 @Override 2054 public WarmupRegionResponse warmupRegion(final RpcController controller, 2055 final WarmupRegionRequest request) throws ServiceException { 2056 final RegionInfo region = ProtobufUtil.toRegionInfo(request.getRegionInfo()); 2057 WarmupRegionResponse response = WarmupRegionResponse.getDefaultInstance(); 2058 try { 2059 checkOpen(); 2060 String encodedName = region.getEncodedName(); 2061 byte[] encodedNameBytes = region.getEncodedNameAsBytes(); 2062 final HRegion onlineRegion = server.getRegion(encodedName); 2063 if (onlineRegion != null) { 2064 LOG.info("{} is online; skipping warmup", region); 2065 return response; 2066 } 2067 TableDescriptor htd = server.getTableDescriptors().get(region.getTable()); 2068 if (server.getRegionsInTransitionInRS().containsKey(encodedNameBytes)) { 2069 LOG.info("{} is in transition; skipping warmup", region); 2070 return response; 2071 } 2072 LOG.info("Warmup {}", region.getRegionNameAsString()); 2073 HRegion.warmupHRegion(region, htd, server.getWAL(region), server.getConfiguration(), server, 2074 null); 2075 } catch (IOException ie) { 2076 LOG.error("Failed warmup of {}", region.getRegionNameAsString(), ie); 2077 throw new ServiceException(ie); 2078 } 2079 2080 return response; 2081 } 2082 2083 private ExtendedCellScanner getAndReset(RpcController controller) { 2084 HBaseRpcController hrc = (HBaseRpcController) controller; 2085 ExtendedCellScanner cells = hrc.cellScanner(); 2086 hrc.setCellScanner(null); 2087 return cells; 2088 } 2089 2090 /** 2091 * Replay the given changes when distributedLogReplay WAL edits from a failed RS. The guarantee is 2092 * that the given mutations will be durable on the receiving RS if this method returns without any 2093 * exception. 2094 * @param controller the RPC controller 2095 * @param request the request 2096 * @deprecated Since 3.0.0, will be removed in 4.0.0. Not used any more, put here only for 2097 * compatibility with old region replica implementation. Now we will use 2098 * {@code replicateToReplica} method instead. 2099 */ 2100 @Deprecated 2101 @Override 2102 @QosPriority(priority = HConstants.REPLAY_QOS) 2103 public ReplicateWALEntryResponse replay(final RpcController controller, 2104 final ReplicateWALEntryRequest request) throws ServiceException { 2105 long before = EnvironmentEdgeManager.currentTime(); 2106 ExtendedCellScanner cells = getAndReset(controller); 2107 try { 2108 checkOpen(); 2109 List<WALEntry> entries = request.getEntryList(); 2110 if (entries == null || entries.isEmpty()) { 2111 // empty input 2112 return ReplicateWALEntryResponse.newBuilder().build(); 2113 } 2114 ByteString regionName = entries.get(0).getKey().getEncodedRegionName(); 2115 HRegion region = server.getRegionByEncodedName(regionName.toStringUtf8()); 2116 RegionCoprocessorHost coprocessorHost = 2117 ServerRegionReplicaUtil.isDefaultReplica(region.getRegionInfo()) 2118 ? region.getCoprocessorHost() 2119 : null; // do not invoke coprocessors if this is a secondary region replica 2120 List<Pair<WALKey, WALEdit>> walEntries = new ArrayList<>(); 2121 2122 // Skip adding the edits to WAL if this is a secondary region replica 2123 boolean isPrimary = RegionReplicaUtil.isDefaultReplica(region.getRegionInfo()); 2124 Durability durability = isPrimary ? Durability.USE_DEFAULT : Durability.SKIP_WAL; 2125 2126 for (WALEntry entry : entries) { 2127 if (!regionName.equals(entry.getKey().getEncodedRegionName())) { 2128 throw new NotServingRegionException("Replay request contains entries from multiple " 2129 + "regions. First region:" + regionName.toStringUtf8() + " , other region:" 2130 + entry.getKey().getEncodedRegionName()); 2131 } 2132 if (server.nonceManager != null && isPrimary) { 2133 long nonceGroup = 2134 entry.getKey().hasNonceGroup() ? entry.getKey().getNonceGroup() : HConstants.NO_NONCE; 2135 long nonce = entry.getKey().hasNonce() ? entry.getKey().getNonce() : HConstants.NO_NONCE; 2136 server.nonceManager.reportOperationFromWal(nonceGroup, nonce, 2137 entry.getKey().getWriteTime()); 2138 } 2139 Pair<WALKey, WALEdit> walEntry = (coprocessorHost == null) ? null : new Pair<>(); 2140 List<MutationReplay> edits = 2141 WALSplitUtil.getMutationsFromWALEntry(entry, cells, walEntry, durability); 2142 if (coprocessorHost != null) { 2143 // Start coprocessor replay here. The coprocessor is for each WALEdit instead of a 2144 // KeyValue. 2145 if ( 2146 coprocessorHost.preWALRestore(region.getRegionInfo(), walEntry.getFirst(), 2147 walEntry.getSecond()) 2148 ) { 2149 // if bypass this log entry, ignore it ... 2150 continue; 2151 } 2152 walEntries.add(walEntry); 2153 } 2154 if (edits != null && !edits.isEmpty()) { 2155 // HBASE-17924 2156 // sort to improve lock efficiency 2157 Collections.sort(edits, (v1, v2) -> Row.COMPARATOR.compare(v1.mutation, v2.mutation)); 2158 long replaySeqId = (entry.getKey().hasOrigSequenceNumber()) 2159 ? entry.getKey().getOrigSequenceNumber() 2160 : entry.getKey().getLogSequenceNumber(); 2161 OperationStatus[] result = doReplayBatchOp(region, edits, replaySeqId); 2162 // check if it's a partial success 2163 for (int i = 0; result != null && i < result.length; i++) { 2164 if (result[i] != OperationStatus.SUCCESS) { 2165 throw new IOException(result[i].getExceptionMsg()); 2166 } 2167 } 2168 } 2169 } 2170 2171 // sync wal at the end because ASYNC_WAL is used above 2172 WAL wal = region.getWAL(); 2173 if (wal != null) { 2174 wal.sync(); 2175 } 2176 2177 if (coprocessorHost != null) { 2178 for (Pair<WALKey, WALEdit> entry : walEntries) { 2179 coprocessorHost.postWALRestore(region.getRegionInfo(), entry.getFirst(), 2180 entry.getSecond()); 2181 } 2182 } 2183 return ReplicateWALEntryResponse.newBuilder().build(); 2184 } catch (IOException ie) { 2185 throw new ServiceException(ie); 2186 } finally { 2187 final MetricsRegionServer metricsRegionServer = server.getMetrics(); 2188 if (metricsRegionServer != null) { 2189 metricsRegionServer.updateReplay(EnvironmentEdgeManager.currentTime() - before); 2190 } 2191 } 2192 } 2193 2194 /** 2195 * Replay the given changes on a secondary replica 2196 */ 2197 @Override 2198 public ReplicateWALEntryResponse replicateToReplica(RpcController controller, 2199 ReplicateWALEntryRequest request) throws ServiceException { 2200 CellScanner cells = getAndReset(controller); 2201 try { 2202 checkOpen(); 2203 List<WALEntry> entries = request.getEntryList(); 2204 if (entries == null || entries.isEmpty()) { 2205 // empty input 2206 return ReplicateWALEntryResponse.newBuilder().build(); 2207 } 2208 ByteString regionName = entries.get(0).getKey().getEncodedRegionName(); 2209 HRegion region = server.getRegionByEncodedName(regionName.toStringUtf8()); 2210 if (RegionReplicaUtil.isDefaultReplica(region.getRegionInfo())) { 2211 throw new DoNotRetryIOException( 2212 "Should not replicate to primary replica " + region.getRegionInfo() + ", CODE BUG?"); 2213 } 2214 for (WALEntry entry : entries) { 2215 if (!regionName.equals(entry.getKey().getEncodedRegionName())) { 2216 throw new NotServingRegionException( 2217 "ReplicateToReplica request contains entries from multiple " + "regions. First region:" 2218 + regionName.toStringUtf8() + " , other region:" 2219 + entry.getKey().getEncodedRegionName()); 2220 } 2221 region.replayWALEntry(entry, cells); 2222 } 2223 return ReplicateWALEntryResponse.newBuilder().build(); 2224 } catch (IOException ie) { 2225 throw new ServiceException(ie); 2226 } 2227 } 2228 2229 private void checkShouldRejectReplicationRequest(List<WALEntry> entries) throws IOException { 2230 ReplicationSourceService replicationSource = server.getReplicationSourceService(); 2231 if (replicationSource == null || entries.isEmpty()) { 2232 return; 2233 } 2234 // We can ensure that all entries are for one peer, so only need to check one entry's 2235 // table name. if the table hit sync replication at peer side and the peer cluster 2236 // is (or is transiting to) state ACTIVE or DOWNGRADE_ACTIVE, we should reject to apply 2237 // those entries according to the design doc. 2238 TableName table = TableName.valueOf(entries.get(0).getKey().getTableName().toByteArray()); 2239 if ( 2240 replicationSource.getSyncReplicationPeerInfoProvider().checkState(table, 2241 RejectReplicationRequestStateChecker.get()) 2242 ) { 2243 throw new DoNotRetryIOException( 2244 "Reject to apply to sink cluster because sync replication state of sink cluster " 2245 + "is ACTIVE or DOWNGRADE_ACTIVE, table: " + table); 2246 } 2247 } 2248 2249 /** 2250 * Replicate WAL entries on the region server. 2251 * @param controller the RPC controller 2252 * @param request the request 2253 */ 2254 @Override 2255 @QosPriority(priority = HConstants.REPLICATION_QOS) 2256 public ReplicateWALEntryResponse replicateWALEntry(final RpcController controller, 2257 final ReplicateWALEntryRequest request) throws ServiceException { 2258 try { 2259 checkOpen(); 2260 if (server.getReplicationSinkService() != null) { 2261 requestCount.increment(); 2262 List<WALEntry> entries = request.getEntryList(); 2263 checkShouldRejectReplicationRequest(entries); 2264 ExtendedCellScanner cellScanner = getAndReset(controller); 2265 server.getRegionServerCoprocessorHost().preReplicateLogEntries(); 2266 server.getReplicationSinkService().replicateLogEntries(entries, cellScanner, 2267 request.getReplicationClusterId(), request.getSourceBaseNamespaceDirPath(), 2268 request.getSourceHFileArchiveDirPath()); 2269 server.getRegionServerCoprocessorHost().postReplicateLogEntries(); 2270 return ReplicateWALEntryResponse.newBuilder().build(); 2271 } else { 2272 throw new ServiceException("Replication services are not initialized yet"); 2273 } 2274 } catch (IOException ie) { 2275 throw new ServiceException(ie); 2276 } 2277 } 2278 2279 /** 2280 * Roll the WAL writer of the region server. 2281 * @param controller the RPC controller 2282 * @param request the request 2283 */ 2284 @Override 2285 @QosPriority(priority = HConstants.ADMIN_QOS) 2286 public RollWALWriterResponse rollWALWriter(final RpcController controller, 2287 final RollWALWriterRequest request) throws ServiceException { 2288 try { 2289 checkOpen(); 2290 requestCount.increment(); 2291 server.getRegionServerCoprocessorHost().preRollWALWriterRequest(); 2292 server.getWalRoller().requestRollAll(); 2293 server.getRegionServerCoprocessorHost().postRollWALWriterRequest(); 2294 RollWALWriterResponse.Builder builder = RollWALWriterResponse.newBuilder(); 2295 return builder.build(); 2296 } catch (IOException ie) { 2297 throw new ServiceException(ie); 2298 } 2299 } 2300 2301 /** 2302 * Stop the region server. 2303 * @param controller the RPC controller 2304 * @param request the request 2305 */ 2306 @Override 2307 @QosPriority(priority = HConstants.ADMIN_QOS) 2308 public StopServerResponse stopServer(final RpcController controller, 2309 final StopServerRequest request) throws ServiceException { 2310 rpcPreCheck("stopServer"); 2311 requestCount.increment(); 2312 String reason = request.getReason(); 2313 server.stop(reason); 2314 return StopServerResponse.newBuilder().build(); 2315 } 2316 2317 @Override 2318 public UpdateFavoredNodesResponse updateFavoredNodes(RpcController controller, 2319 UpdateFavoredNodesRequest request) throws ServiceException { 2320 rpcPreCheck("updateFavoredNodes"); 2321 List<UpdateFavoredNodesRequest.RegionUpdateInfo> openInfoList = request.getUpdateInfoList(); 2322 UpdateFavoredNodesResponse.Builder respBuilder = UpdateFavoredNodesResponse.newBuilder(); 2323 for (UpdateFavoredNodesRequest.RegionUpdateInfo regionUpdateInfo : openInfoList) { 2324 RegionInfo hri = ProtobufUtil.toRegionInfo(regionUpdateInfo.getRegion()); 2325 if (regionUpdateInfo.getFavoredNodesCount() > 0) { 2326 server.updateRegionFavoredNodesMapping(hri.getEncodedName(), 2327 regionUpdateInfo.getFavoredNodesList()); 2328 } 2329 } 2330 respBuilder.setResponse(openInfoList.size()); 2331 return respBuilder.build(); 2332 } 2333 2334 /** 2335 * Atomically bulk load several HFiles into an open region 2336 * @return true if successful, false is failed but recoverably (no action) 2337 * @throws ServiceException if failed unrecoverably 2338 */ 2339 @Override 2340 public BulkLoadHFileResponse bulkLoadHFile(final RpcController controller, 2341 final BulkLoadHFileRequest request) throws ServiceException { 2342 long start = EnvironmentEdgeManager.currentTime(); 2343 List<String> clusterIds = new ArrayList<>(request.getClusterIdsList()); 2344 if (clusterIds.contains(this.server.getClusterId())) { 2345 return BulkLoadHFileResponse.newBuilder().setLoaded(true).build(); 2346 } else { 2347 clusterIds.add(this.server.getClusterId()); 2348 } 2349 try { 2350 checkOpen(); 2351 requestCount.increment(); 2352 HRegion region = getRegion(request.getRegion()); 2353 final boolean spaceQuotaEnabled = QuotaUtil.isQuotaEnabled(getConfiguration()); 2354 long sizeToBeLoaded = -1; 2355 2356 // Check to see if this bulk load would exceed the space quota for this table 2357 if (spaceQuotaEnabled) { 2358 ActivePolicyEnforcement activeSpaceQuotas = getSpaceQuotaManager().getActiveEnforcements(); 2359 SpaceViolationPolicyEnforcement enforcement = 2360 activeSpaceQuotas.getPolicyEnforcement(region); 2361 if (enforcement != null) { 2362 // Bulk loads must still be atomic. We must enact all or none. 2363 List<String> filePaths = new ArrayList<>(request.getFamilyPathCount()); 2364 for (FamilyPath familyPath : request.getFamilyPathList()) { 2365 filePaths.add(familyPath.getPath()); 2366 } 2367 // Check if the batch of files exceeds the current quota 2368 sizeToBeLoaded = enforcement.computeBulkLoadSize(getFileSystem(filePaths), filePaths); 2369 } 2370 } 2371 // secure bulk load 2372 Map<byte[], List<Path>> map = 2373 server.getSecureBulkLoadManager().secureBulkLoadHFiles(region, request, clusterIds); 2374 BulkLoadHFileResponse.Builder builder = BulkLoadHFileResponse.newBuilder(); 2375 builder.setLoaded(map != null); 2376 if (map != null) { 2377 // Treat any negative size as a flag to "ignore" updating the region size as that is 2378 // not possible to occur in real life (cannot bulk load a file with negative size) 2379 if (spaceQuotaEnabled && sizeToBeLoaded > 0) { 2380 if (LOG.isTraceEnabled()) { 2381 LOG.trace("Incrementing space use of " + region.getRegionInfo() + " by " 2382 + sizeToBeLoaded + " bytes"); 2383 } 2384 // Inform space quotas of the new files for this region 2385 getSpaceQuotaManager().getRegionSizeStore().incrementRegionSize(region.getRegionInfo(), 2386 sizeToBeLoaded); 2387 } 2388 } 2389 return builder.build(); 2390 } catch (IOException ie) { 2391 throw new ServiceException(ie); 2392 } finally { 2393 final MetricsRegionServer metricsRegionServer = server.getMetrics(); 2394 if (metricsRegionServer != null) { 2395 metricsRegionServer.updateBulkLoad(EnvironmentEdgeManager.currentTime() - start); 2396 } 2397 } 2398 } 2399 2400 @Override 2401 public PrepareBulkLoadResponse prepareBulkLoad(RpcController controller, 2402 PrepareBulkLoadRequest request) throws ServiceException { 2403 try { 2404 checkOpen(); 2405 requestCount.increment(); 2406 2407 HRegion region = getRegion(request.getRegion()); 2408 2409 String bulkToken = server.getSecureBulkLoadManager().prepareBulkLoad(region, request); 2410 PrepareBulkLoadResponse.Builder builder = PrepareBulkLoadResponse.newBuilder(); 2411 builder.setBulkToken(bulkToken); 2412 return builder.build(); 2413 } catch (IOException ie) { 2414 throw new ServiceException(ie); 2415 } 2416 } 2417 2418 @Override 2419 public CleanupBulkLoadResponse cleanupBulkLoad(RpcController controller, 2420 CleanupBulkLoadRequest request) throws ServiceException { 2421 try { 2422 checkOpen(); 2423 requestCount.increment(); 2424 2425 HRegion region = getRegion(request.getRegion()); 2426 2427 server.getSecureBulkLoadManager().cleanupBulkLoad(region, request); 2428 return CleanupBulkLoadResponse.newBuilder().build(); 2429 } catch (IOException ie) { 2430 throw new ServiceException(ie); 2431 } 2432 } 2433 2434 @Override 2435 public CoprocessorServiceResponse execService(final RpcController controller, 2436 final CoprocessorServiceRequest request) throws ServiceException { 2437 try { 2438 checkOpen(); 2439 requestCount.increment(); 2440 HRegion region = getRegion(request.getRegion()); 2441 Message result = execServiceOnRegion(region, request.getCall()); 2442 CoprocessorServiceResponse.Builder builder = CoprocessorServiceResponse.newBuilder(); 2443 builder.setRegion(RequestConverter.buildRegionSpecifier(RegionSpecifierType.REGION_NAME, 2444 region.getRegionInfo().getRegionName())); 2445 // TODO: COPIES!!!!!! 2446 builder.setValue(builder.getValueBuilder().setName(result.getClass().getName()).setValue( 2447 org.apache.hbase.thirdparty.com.google.protobuf.ByteString.copyFrom(result.toByteArray()))); 2448 return builder.build(); 2449 } catch (IOException ie) { 2450 throw new ServiceException(ie); 2451 } 2452 } 2453 2454 private FileSystem getFileSystem(List<String> filePaths) throws IOException { 2455 if (filePaths.isEmpty()) { 2456 // local hdfs 2457 return server.getFileSystem(); 2458 } 2459 // source hdfs 2460 return new Path(filePaths.get(0)).getFileSystem(server.getConfiguration()); 2461 } 2462 2463 private Message execServiceOnRegion(HRegion region, 2464 final ClientProtos.CoprocessorServiceCall serviceCall) throws IOException { 2465 // ignore the passed in controller (from the serialized call) 2466 ServerRpcController execController = new ServerRpcController(); 2467 return region.execService(execController, serviceCall); 2468 } 2469 2470 private boolean shouldRejectRequestsFromClient(HRegion region) { 2471 TableName table = region.getRegionInfo().getTable(); 2472 ReplicationSourceService service = server.getReplicationSourceService(); 2473 return service != null && service.getSyncReplicationPeerInfoProvider().checkState(table, 2474 RejectRequestsFromClientStateChecker.get()); 2475 } 2476 2477 private void rejectIfInStandByState(HRegion region) throws DoNotRetryIOException { 2478 if (shouldRejectRequestsFromClient(region)) { 2479 throw new DoNotRetryIOException( 2480 region.getRegionInfo().getRegionNameAsString() + " is in STANDBY state."); 2481 } 2482 } 2483 2484 /** 2485 * Get data from a table. 2486 * @param controller the RPC controller 2487 * @param request the get request 2488 */ 2489 @Override 2490 public GetResponse get(final RpcController controller, final GetRequest request) 2491 throws ServiceException { 2492 long before = EnvironmentEdgeManager.currentTime(); 2493 OperationQuota quota = null; 2494 HRegion region = null; 2495 RpcCallContext context = RpcServer.getCurrentCall().orElse(null); 2496 try { 2497 checkOpen(); 2498 requestCount.increment(); 2499 rpcGetRequestCount.increment(); 2500 region = getRegion(request.getRegion()); 2501 rejectIfInStandByState(region); 2502 2503 GetResponse.Builder builder = GetResponse.newBuilder(); 2504 ClientProtos.Get get = request.getGet(); 2505 // An asynchbase client, https://github.com/OpenTSDB/asynchbase, starts by trying to do 2506 // a get closest before. Throwing the UnknownProtocolException signals it that it needs 2507 // to switch and do hbase2 protocol (HBase servers do not tell clients what versions 2508 // they are; its a problem for non-native clients like asynchbase. HBASE-20225. 2509 if (get.hasClosestRowBefore() && get.getClosestRowBefore()) { 2510 throw new UnknownProtocolException("Is this a pre-hbase-1.0.0 or asynchbase client? " 2511 + "Client is invoking getClosestRowBefore removed in hbase-2.0.0 replaced by " 2512 + "reverse Scan."); 2513 } 2514 Boolean existence = null; 2515 Result r = null; 2516 quota = getRpcQuotaManager().checkBatchQuota(region, OperationQuota.OperationType.GET); 2517 2518 Get clientGet = ProtobufUtil.toGet(get); 2519 if (get.getExistenceOnly() && region.getCoprocessorHost() != null) { 2520 existence = region.getCoprocessorHost().preExists(clientGet); 2521 } 2522 if (existence == null) { 2523 if (context != null) { 2524 r = get(clientGet, (region), null, context); 2525 } else { 2526 // for test purpose 2527 r = region.get(clientGet); 2528 } 2529 if (get.getExistenceOnly()) { 2530 boolean exists = r.getExists(); 2531 if (region.getCoprocessorHost() != null) { 2532 exists = region.getCoprocessorHost().postExists(clientGet, exists); 2533 } 2534 existence = exists; 2535 } 2536 } 2537 if (existence != null) { 2538 ClientProtos.Result pbr = 2539 ProtobufUtil.toResult(existence, region.getRegionInfo().getReplicaId() != 0); 2540 builder.setResult(pbr); 2541 } else if (r != null) { 2542 ClientProtos.Result pbr; 2543 if ( 2544 isClientCellBlockSupport(context) && controller instanceof HBaseRpcController 2545 && VersionInfoUtil.hasMinimumVersion(context.getClientVersionInfo(), 1, 3) 2546 ) { 2547 pbr = ProtobufUtil.toResultNoData(r); 2548 ((HBaseRpcController) controller).setCellScanner( 2549 PrivateCellUtil.createExtendedCellScanner(ClientInternalHelper.getExtendedRawCells(r))); 2550 addSize(context, r); 2551 } else { 2552 pbr = ProtobufUtil.toResult(r); 2553 } 2554 builder.setResult(pbr); 2555 } 2556 // r.cells is null when an table.exists(get) call 2557 if (r != null && r.rawCells() != null) { 2558 quota.addGetResult(r); 2559 } 2560 return builder.build(); 2561 } catch (IOException ie) { 2562 throw new ServiceException(ie); 2563 } finally { 2564 final MetricsRegionServer metricsRegionServer = server.getMetrics(); 2565 if (metricsRegionServer != null && region != null) { 2566 long blockBytesScanned = context != null ? context.getBlockBytesScanned() : 0; 2567 metricsRegionServer.updateGet(region, EnvironmentEdgeManager.currentTime() - before, 2568 blockBytesScanned); 2569 } 2570 if (quota != null) { 2571 quota.close(); 2572 } 2573 } 2574 } 2575 2576 private Result get(Get get, HRegion region, RegionScannersCloseCallBack closeCallBack, 2577 RpcCallContext context) throws IOException { 2578 region.prepareGet(get); 2579 boolean stale = region.getRegionInfo().getReplicaId() != 0; 2580 2581 // This method is almost the same as HRegion#get. 2582 List<Cell> results = new ArrayList<>(); 2583 // pre-get CP hook 2584 if (region.getCoprocessorHost() != null) { 2585 if (region.getCoprocessorHost().preGet(get, results)) { 2586 region.metricsUpdateForGet(); 2587 return Result.create(results, get.isCheckExistenceOnly() ? !results.isEmpty() : null, 2588 stale); 2589 } 2590 } 2591 Scan scan = new Scan(get); 2592 if (scan.getLoadColumnFamiliesOnDemandValue() == null) { 2593 scan.setLoadColumnFamiliesOnDemand(region.isLoadingCfsOnDemandDefault()); 2594 } 2595 RegionScannerImpl scanner = null; 2596 long blockBytesScannedBefore = context.getBlockBytesScanned(); 2597 try { 2598 scanner = region.getScanner(scan); 2599 scanner.next(results); 2600 } finally { 2601 if (scanner != null) { 2602 if (closeCallBack == null) { 2603 // If there is a context then the scanner can be added to the current 2604 // RpcCallContext. The rpc callback will take care of closing the 2605 // scanner, for eg in case 2606 // of get() 2607 context.setCallBack(scanner); 2608 } else { 2609 // The call is from multi() where the results from the get() are 2610 // aggregated and then send out to the 2611 // rpc. The rpccall back will close all such scanners created as part 2612 // of multi(). 2613 closeCallBack.addScanner(scanner); 2614 } 2615 } 2616 } 2617 2618 // post-get CP hook 2619 if (region.getCoprocessorHost() != null) { 2620 region.getCoprocessorHost().postGet(get, results); 2621 } 2622 region.metricsUpdateForGet(); 2623 2624 Result r = 2625 Result.create(results, get.isCheckExistenceOnly() ? !results.isEmpty() : null, stale); 2626 if (get.isQueryMetricsEnabled()) { 2627 long blockBytesScanned = context.getBlockBytesScanned() - blockBytesScannedBefore; 2628 r.setMetrics(new QueryMetrics(blockBytesScanned)); 2629 } 2630 return r; 2631 } 2632 2633 private void checkBatchSizeAndLogLargeSize(MultiRequest request) throws ServiceException { 2634 int sum = 0; 2635 String firstRegionName = null; 2636 for (RegionAction regionAction : request.getRegionActionList()) { 2637 if (sum == 0) { 2638 firstRegionName = Bytes.toStringBinary(regionAction.getRegion().getValue().toByteArray()); 2639 } 2640 sum += regionAction.getActionCount(); 2641 } 2642 if (sum > rowSizeWarnThreshold) { 2643 LOG.warn("Large batch operation detected (greater than " + rowSizeWarnThreshold 2644 + ") (HBASE-18023)." + " Requested Number of Rows: " + sum + " Client: " 2645 + RpcServer.getRequestUserName().orElse(null) + "/" 2646 + RpcServer.getRemoteAddress().orElse(null) + " first region in multi=" + firstRegionName); 2647 if (rejectRowsWithSizeOverThreshold) { 2648 throw new ServiceException( 2649 "Rejecting large batch operation for current batch with firstRegionName: " 2650 + firstRegionName + " , Requested Number of Rows: " + sum + " , Size Threshold: " 2651 + rowSizeWarnThreshold); 2652 } 2653 } 2654 } 2655 2656 private void failRegionAction(MultiResponse.Builder responseBuilder, 2657 RegionActionResult.Builder regionActionResultBuilder, RegionAction regionAction, 2658 CellScanner cellScanner, Throwable error) { 2659 rpcServer.getMetrics().exception(error); 2660 regionActionResultBuilder.setException(ResponseConverter.buildException(error)); 2661 responseBuilder.addRegionActionResult(regionActionResultBuilder.build()); 2662 // All Mutations in this RegionAction not executed as we can not see the Region online here 2663 // in this RS. Will be retried from Client. Skipping all the Cells in CellScanner 2664 // corresponding to these Mutations. 2665 if (cellScanner != null) { 2666 skipCellsForMutations(regionAction.getActionList(), cellScanner); 2667 } 2668 } 2669 2670 private boolean isReplicationRequest(Action action) { 2671 // replication request can only be put or delete. 2672 if (!action.hasMutation()) { 2673 return false; 2674 } 2675 MutationProto mutation = action.getMutation(); 2676 MutationType type = mutation.getMutateType(); 2677 if (type != MutationType.PUT && type != MutationType.DELETE) { 2678 return false; 2679 } 2680 // replication will set a special attribute so we can make use of it to decide whether a request 2681 // is for replication. 2682 return mutation.getAttributeList().stream().map(p -> p.getName()) 2683 .filter(n -> n.equals(ReplicationUtils.REPLICATION_ATTR_NAME)).findAny().isPresent(); 2684 } 2685 2686 /** 2687 * Execute multiple actions on a table: get, mutate, and/or execCoprocessor 2688 * @param rpcc the RPC controller 2689 * @param request the multi request 2690 */ 2691 @Override 2692 public MultiResponse multi(final RpcController rpcc, final MultiRequest request) 2693 throws ServiceException { 2694 try { 2695 checkOpen(); 2696 } catch (IOException ie) { 2697 throw new ServiceException(ie); 2698 } 2699 2700 checkBatchSizeAndLogLargeSize(request); 2701 2702 // rpc controller is how we bring in data via the back door; it is unprotobuf'ed data. 2703 // It is also the conduit via which we pass back data. 2704 HBaseRpcController controller = (HBaseRpcController) rpcc; 2705 CellScanner cellScanner = controller != null ? getAndReset(controller) : null; 2706 2707 long nonceGroup = request.hasNonceGroup() ? request.getNonceGroup() : HConstants.NO_NONCE; 2708 2709 MultiResponse.Builder responseBuilder = MultiResponse.newBuilder(); 2710 RegionActionResult.Builder regionActionResultBuilder = RegionActionResult.newBuilder(); 2711 this.rpcMultiRequestCount.increment(); 2712 this.requestCount.increment(); 2713 ActivePolicyEnforcement spaceQuotaEnforcement = getSpaceQuotaManager().getActiveEnforcements(); 2714 2715 // We no longer use MultiRequest#condition. Instead, we use RegionAction#condition. The 2716 // following logic is for backward compatibility as old clients still use 2717 // MultiRequest#condition in case of checkAndMutate with RowMutations. 2718 if (request.hasCondition()) { 2719 if (request.getRegionActionList().isEmpty()) { 2720 // If the region action list is empty, do nothing. 2721 responseBuilder.setProcessed(true); 2722 return responseBuilder.build(); 2723 } 2724 2725 RegionAction regionAction = request.getRegionAction(0); 2726 2727 // When request.hasCondition() is true, regionAction.getAtomic() should be always true. So 2728 // we can assume regionAction.getAtomic() is true here. 2729 assert regionAction.getAtomic(); 2730 2731 OperationQuota quota; 2732 HRegion region; 2733 RegionSpecifier regionSpecifier = regionAction.getRegion(); 2734 2735 try { 2736 region = getRegion(regionSpecifier); 2737 quota = getRpcQuotaManager().checkBatchQuota(region, regionAction.getActionList(), 2738 regionAction.hasCondition()); 2739 } catch (IOException e) { 2740 failRegionAction(responseBuilder, regionActionResultBuilder, regionAction, cellScanner, e); 2741 return responseBuilder.build(); 2742 } 2743 2744 try { 2745 boolean rejectIfFromClient = shouldRejectRequestsFromClient(region); 2746 // We only allow replication in standby state and it will not set the atomic flag. 2747 if (rejectIfFromClient) { 2748 failRegionAction(responseBuilder, regionActionResultBuilder, regionAction, cellScanner, 2749 new DoNotRetryIOException( 2750 region.getRegionInfo().getRegionNameAsString() + " is in STANDBY state")); 2751 return responseBuilder.build(); 2752 } 2753 2754 try { 2755 CheckAndMutateResult result = checkAndMutate(region, regionAction.getActionList(), 2756 cellScanner, request.getCondition(), nonceGroup, spaceQuotaEnforcement); 2757 responseBuilder.setProcessed(result.isSuccess()); 2758 ClientProtos.ResultOrException.Builder resultOrExceptionOrBuilder = 2759 ClientProtos.ResultOrException.newBuilder(); 2760 for (int i = 0; i < regionAction.getActionCount(); i++) { 2761 // To unify the response format with doNonAtomicRegionMutation and read through 2762 // client's AsyncProcess we have to add an empty result instance per operation 2763 resultOrExceptionOrBuilder.clear(); 2764 resultOrExceptionOrBuilder.setIndex(i); 2765 regionActionResultBuilder.addResultOrException(resultOrExceptionOrBuilder.build()); 2766 } 2767 } catch (IOException e) { 2768 rpcServer.getMetrics().exception(e); 2769 // As it's an atomic operation with a condition, we may expect it's a global failure. 2770 regionActionResultBuilder.setException(ResponseConverter.buildException(e)); 2771 } 2772 } finally { 2773 quota.close(); 2774 } 2775 2776 responseBuilder.addRegionActionResult(regionActionResultBuilder.build()); 2777 ClientProtos.RegionLoadStats regionLoadStats = region.getLoadStatistics(); 2778 if (regionLoadStats != null) { 2779 responseBuilder.setRegionStatistics(MultiRegionLoadStats.newBuilder() 2780 .addRegion(regionSpecifier).addStat(regionLoadStats).build()); 2781 } 2782 return responseBuilder.build(); 2783 } 2784 2785 // this will contain all the cells that we need to return. It's created later, if needed. 2786 List<ExtendedCellScannable> cellsToReturn = null; 2787 RegionScannersCloseCallBack closeCallBack = null; 2788 RpcCallContext context = RpcServer.getCurrentCall().orElse(null); 2789 Map<RegionSpecifier, ClientProtos.RegionLoadStats> regionStats = 2790 new HashMap<>(request.getRegionActionCount()); 2791 2792 for (RegionAction regionAction : request.getRegionActionList()) { 2793 OperationQuota quota; 2794 HRegion region; 2795 RegionSpecifier regionSpecifier = regionAction.getRegion(); 2796 regionActionResultBuilder.clear(); 2797 2798 try { 2799 region = getRegion(regionSpecifier); 2800 quota = getRpcQuotaManager().checkBatchQuota(region, regionAction.getActionList(), 2801 regionAction.hasCondition()); 2802 } catch (IOException e) { 2803 failRegionAction(responseBuilder, regionActionResultBuilder, regionAction, cellScanner, e); 2804 continue; // For this region it's a failure. 2805 } 2806 2807 try { 2808 boolean rejectIfFromClient = shouldRejectRequestsFromClient(region); 2809 2810 if (regionAction.hasCondition()) { 2811 // We only allow replication in standby state and it will not set the atomic flag. 2812 if (rejectIfFromClient) { 2813 failRegionAction(responseBuilder, regionActionResultBuilder, regionAction, cellScanner, 2814 new DoNotRetryIOException( 2815 region.getRegionInfo().getRegionNameAsString() + " is in STANDBY state")); 2816 continue; 2817 } 2818 2819 try { 2820 ClientProtos.ResultOrException.Builder resultOrExceptionOrBuilder = 2821 ClientProtos.ResultOrException.newBuilder(); 2822 if (regionAction.getActionCount() == 1) { 2823 CheckAndMutateResult result = 2824 checkAndMutate(region, quota, regionAction.getAction(0).getMutation(), cellScanner, 2825 regionAction.getCondition(), nonceGroup, spaceQuotaEnforcement, context); 2826 regionActionResultBuilder.setProcessed(result.isSuccess()); 2827 resultOrExceptionOrBuilder.setIndex(0); 2828 if (result.getResult() != null) { 2829 resultOrExceptionOrBuilder.setResult(ProtobufUtil.toResult(result.getResult())); 2830 } 2831 2832 if (result.getMetrics() != null) { 2833 resultOrExceptionOrBuilder 2834 .setMetrics(ProtobufUtil.toQueryMetrics(result.getMetrics())); 2835 } 2836 2837 regionActionResultBuilder.addResultOrException(resultOrExceptionOrBuilder.build()); 2838 } else { 2839 CheckAndMutateResult result = checkAndMutate(region, regionAction.getActionList(), 2840 cellScanner, regionAction.getCondition(), nonceGroup, spaceQuotaEnforcement); 2841 regionActionResultBuilder.setProcessed(result.isSuccess()); 2842 for (int i = 0; i < regionAction.getActionCount(); i++) { 2843 if (i == 0 && result.getResult() != null) { 2844 // Set the result of the Increment/Append operations to the first element of the 2845 // ResultOrException list 2846 resultOrExceptionOrBuilder.setIndex(i); 2847 regionActionResultBuilder.addResultOrException(resultOrExceptionOrBuilder 2848 .setResult(ProtobufUtil.toResult(result.getResult())).build()); 2849 continue; 2850 } 2851 // To unify the response format with doNonAtomicRegionMutation and read through 2852 // client's AsyncProcess we have to add an empty result instance per operation 2853 resultOrExceptionOrBuilder.clear(); 2854 resultOrExceptionOrBuilder.setIndex(i); 2855 regionActionResultBuilder.addResultOrException(resultOrExceptionOrBuilder.build()); 2856 } 2857 } 2858 } catch (IOException e) { 2859 rpcServer.getMetrics().exception(e); 2860 // As it's an atomic operation with a condition, we may expect it's a global failure. 2861 regionActionResultBuilder.setException(ResponseConverter.buildException(e)); 2862 } 2863 } else if (regionAction.hasAtomic() && regionAction.getAtomic()) { 2864 // We only allow replication in standby state and it will not set the atomic flag. 2865 if (rejectIfFromClient) { 2866 failRegionAction(responseBuilder, regionActionResultBuilder, regionAction, cellScanner, 2867 new DoNotRetryIOException( 2868 region.getRegionInfo().getRegionNameAsString() + " is in STANDBY state")); 2869 continue; 2870 } 2871 try { 2872 doAtomicBatchOp(regionActionResultBuilder, region, quota, regionAction.getActionList(), 2873 cellScanner, nonceGroup, spaceQuotaEnforcement); 2874 regionActionResultBuilder.setProcessed(true); 2875 // We no longer use MultiResponse#processed. Instead, we use 2876 // RegionActionResult#processed. This is for backward compatibility for old clients. 2877 responseBuilder.setProcessed(true); 2878 } catch (IOException e) { 2879 rpcServer.getMetrics().exception(e); 2880 // As it's atomic, we may expect it's a global failure. 2881 regionActionResultBuilder.setException(ResponseConverter.buildException(e)); 2882 } 2883 } else { 2884 if ( 2885 rejectIfFromClient && regionAction.getActionCount() > 0 2886 && !isReplicationRequest(regionAction.getAction(0)) 2887 ) { 2888 // fail if it is not a replication request 2889 failRegionAction(responseBuilder, regionActionResultBuilder, regionAction, cellScanner, 2890 new DoNotRetryIOException( 2891 region.getRegionInfo().getRegionNameAsString() + " is in STANDBY state")); 2892 continue; 2893 } 2894 // doNonAtomicRegionMutation manages the exception internally 2895 if (context != null && closeCallBack == null) { 2896 // An RpcCallBack that creates a list of scanners that needs to perform callBack 2897 // operation on completion of multiGets. 2898 // Set this only once 2899 closeCallBack = new RegionScannersCloseCallBack(); 2900 context.setCallBack(closeCallBack); 2901 } 2902 cellsToReturn = doNonAtomicRegionMutation(region, quota, regionAction, cellScanner, 2903 regionActionResultBuilder, cellsToReturn, nonceGroup, closeCallBack, context, 2904 spaceQuotaEnforcement); 2905 } 2906 } finally { 2907 quota.close(); 2908 } 2909 2910 responseBuilder.addRegionActionResult(regionActionResultBuilder.build()); 2911 ClientProtos.RegionLoadStats regionLoadStats = region.getLoadStatistics(); 2912 if (regionLoadStats != null) { 2913 regionStats.put(regionSpecifier, regionLoadStats); 2914 } 2915 } 2916 // Load the controller with the Cells to return. 2917 if (cellsToReturn != null && !cellsToReturn.isEmpty() && controller != null) { 2918 controller.setCellScanner(PrivateCellUtil.createExtendedCellScanner(cellsToReturn)); 2919 } 2920 2921 MultiRegionLoadStats.Builder builder = MultiRegionLoadStats.newBuilder(); 2922 for (Entry<RegionSpecifier, ClientProtos.RegionLoadStats> stat : regionStats.entrySet()) { 2923 builder.addRegion(stat.getKey()); 2924 builder.addStat(stat.getValue()); 2925 } 2926 responseBuilder.setRegionStatistics(builder); 2927 return responseBuilder.build(); 2928 } 2929 2930 private void skipCellsForMutations(List<Action> actions, CellScanner cellScanner) { 2931 if (cellScanner == null) { 2932 return; 2933 } 2934 for (Action action : actions) { 2935 skipCellsForMutation(action, cellScanner); 2936 } 2937 } 2938 2939 private void skipCellsForMutation(Action action, CellScanner cellScanner) { 2940 if (cellScanner == null) { 2941 return; 2942 } 2943 try { 2944 if (action.hasMutation()) { 2945 MutationProto m = action.getMutation(); 2946 if (m.hasAssociatedCellCount()) { 2947 for (int i = 0; i < m.getAssociatedCellCount(); i++) { 2948 cellScanner.advance(); 2949 } 2950 } 2951 } 2952 } catch (IOException e) { 2953 // No need to handle these Individual Muatation level issue. Any way this entire RegionAction 2954 // marked as failed as we could not see the Region here. At client side the top level 2955 // RegionAction exception will be considered first. 2956 LOG.error("Error while skipping Cells in CellScanner for invalid Region Mutations", e); 2957 } 2958 } 2959 2960 /** 2961 * Mutate data in a table. 2962 * @param rpcc the RPC controller 2963 * @param request the mutate request 2964 */ 2965 @Override 2966 public MutateResponse mutate(final RpcController rpcc, final MutateRequest request) 2967 throws ServiceException { 2968 // rpc controller is how we bring in data via the back door; it is unprotobuf'ed data. 2969 // It is also the conduit via which we pass back data. 2970 HBaseRpcController controller = (HBaseRpcController) rpcc; 2971 CellScanner cellScanner = controller != null ? controller.cellScanner() : null; 2972 OperationQuota quota = null; 2973 RpcCallContext context = RpcServer.getCurrentCall().orElse(null); 2974 // Clear scanner so we are not holding on to reference across call. 2975 if (controller != null) { 2976 controller.setCellScanner(null); 2977 } 2978 try { 2979 checkOpen(); 2980 requestCount.increment(); 2981 rpcMutateRequestCount.increment(); 2982 HRegion region = getRegion(request.getRegion()); 2983 rejectIfInStandByState(region); 2984 MutateResponse.Builder builder = MutateResponse.newBuilder(); 2985 MutationProto mutation = request.getMutation(); 2986 if (!region.getRegionInfo().isMetaRegion()) { 2987 server.getMemStoreFlusher().reclaimMemStoreMemory(); 2988 } 2989 long nonceGroup = request.hasNonceGroup() ? request.getNonceGroup() : HConstants.NO_NONCE; 2990 OperationQuota.OperationType operationType = QuotaUtil.getQuotaOperationType(request); 2991 quota = getRpcQuotaManager().checkBatchQuota(region, operationType); 2992 ActivePolicyEnforcement spaceQuotaEnforcement = 2993 getSpaceQuotaManager().getActiveEnforcements(); 2994 2995 if (request.hasCondition()) { 2996 CheckAndMutateResult result = checkAndMutate(region, quota, mutation, cellScanner, 2997 request.getCondition(), nonceGroup, spaceQuotaEnforcement, context); 2998 builder.setProcessed(result.isSuccess()); 2999 boolean clientCellBlockSupported = isClientCellBlockSupport(context); 3000 addResult(builder, result.getResult(), controller, clientCellBlockSupported); 3001 if (clientCellBlockSupported) { 3002 addSize(context, result.getResult()); 3003 } 3004 if (result.getMetrics() != null) { 3005 builder.setMetrics(ProtobufUtil.toQueryMetrics(result.getMetrics())); 3006 } 3007 } else { 3008 Result r = null; 3009 Boolean processed = null; 3010 MutationType type = mutation.getMutateType(); 3011 switch (type) { 3012 case APPEND: 3013 // TODO: this doesn't actually check anything. 3014 r = append(region, quota, mutation, cellScanner, nonceGroup, spaceQuotaEnforcement, 3015 context); 3016 break; 3017 case INCREMENT: 3018 // TODO: this doesn't actually check anything. 3019 r = increment(region, quota, mutation, cellScanner, nonceGroup, spaceQuotaEnforcement, 3020 context); 3021 break; 3022 case PUT: 3023 put(region, quota, mutation, cellScanner, spaceQuotaEnforcement); 3024 processed = Boolean.TRUE; 3025 break; 3026 case DELETE: 3027 delete(region, quota, mutation, cellScanner, spaceQuotaEnforcement); 3028 processed = Boolean.TRUE; 3029 break; 3030 default: 3031 throw new DoNotRetryIOException("Unsupported mutate type: " + type.name()); 3032 } 3033 if (processed != null) { 3034 builder.setProcessed(processed); 3035 } 3036 boolean clientCellBlockSupported = isClientCellBlockSupport(context); 3037 addResult(builder, r, controller, clientCellBlockSupported); 3038 if (clientCellBlockSupported) { 3039 addSize(context, r); 3040 } 3041 } 3042 return builder.build(); 3043 } catch (IOException ie) { 3044 server.checkFileSystem(); 3045 throw new ServiceException(ie); 3046 } finally { 3047 if (quota != null) { 3048 quota.close(); 3049 } 3050 } 3051 } 3052 3053 private void put(HRegion region, OperationQuota quota, MutationProto mutation, 3054 CellScanner cellScanner, ActivePolicyEnforcement spaceQuota) throws IOException { 3055 long before = EnvironmentEdgeManager.currentTime(); 3056 Put put = ProtobufUtil.toPut(mutation, cellScanner); 3057 checkCellSizeLimit(region, put); 3058 spaceQuota.getPolicyEnforcement(region).check(put); 3059 quota.addMutation(put); 3060 region.put(put); 3061 3062 MetricsRegionServer metricsRegionServer = server.getMetrics(); 3063 if (metricsRegionServer != null) { 3064 long after = EnvironmentEdgeManager.currentTime(); 3065 metricsRegionServer.updatePut(region, after - before); 3066 } 3067 } 3068 3069 private void delete(HRegion region, OperationQuota quota, MutationProto mutation, 3070 CellScanner cellScanner, ActivePolicyEnforcement spaceQuota) throws IOException { 3071 long before = EnvironmentEdgeManager.currentTime(); 3072 Delete delete = ProtobufUtil.toDelete(mutation, cellScanner); 3073 checkCellSizeLimit(region, delete); 3074 spaceQuota.getPolicyEnforcement(region).check(delete); 3075 quota.addMutation(delete); 3076 region.delete(delete); 3077 3078 MetricsRegionServer metricsRegionServer = server.getMetrics(); 3079 if (metricsRegionServer != null) { 3080 long after = EnvironmentEdgeManager.currentTime(); 3081 metricsRegionServer.updateDelete(region, after - before); 3082 } 3083 } 3084 3085 private CheckAndMutateResult checkAndMutate(HRegion region, OperationQuota quota, 3086 MutationProto mutation, CellScanner cellScanner, Condition condition, long nonceGroup, 3087 ActivePolicyEnforcement spaceQuota, RpcCallContext context) throws IOException { 3088 long before = EnvironmentEdgeManager.currentTime(); 3089 long blockBytesScannedBefore = context != null ? context.getBlockBytesScanned() : 0; 3090 CheckAndMutate checkAndMutate = ProtobufUtil.toCheckAndMutate(condition, mutation, cellScanner); 3091 long nonce = mutation.hasNonce() ? mutation.getNonce() : HConstants.NO_NONCE; 3092 checkCellSizeLimit(region, (Mutation) checkAndMutate.getAction()); 3093 spaceQuota.getPolicyEnforcement(region).check((Mutation) checkAndMutate.getAction()); 3094 quota.addMutation((Mutation) checkAndMutate.getAction()); 3095 3096 CheckAndMutateResult result = null; 3097 if (region.getCoprocessorHost() != null) { 3098 result = region.getCoprocessorHost().preCheckAndMutate(checkAndMutate); 3099 } 3100 if (result == null) { 3101 result = region.checkAndMutate(checkAndMutate, nonceGroup, nonce); 3102 if (region.getCoprocessorHost() != null) { 3103 result = region.getCoprocessorHost().postCheckAndMutate(checkAndMutate, result); 3104 } 3105 } 3106 MetricsRegionServer metricsRegionServer = server.getMetrics(); 3107 if (metricsRegionServer != null) { 3108 long after = EnvironmentEdgeManager.currentTime(); 3109 long blockBytesScanned = 3110 context != null ? context.getBlockBytesScanned() - blockBytesScannedBefore : 0; 3111 metricsRegionServer.updateCheckAndMutate(region, after - before, blockBytesScanned); 3112 3113 MutationType type = mutation.getMutateType(); 3114 switch (type) { 3115 case PUT: 3116 metricsRegionServer.updateCheckAndPut(region, after - before); 3117 break; 3118 case DELETE: 3119 metricsRegionServer.updateCheckAndDelete(region, after - before); 3120 break; 3121 default: 3122 break; 3123 } 3124 } 3125 return result; 3126 } 3127 3128 // This is used to keep compatible with the old client implementation. Consider remove it if we 3129 // decide to drop the support of the client that still sends close request to a region scanner 3130 // which has already been exhausted. 3131 @Deprecated 3132 private static final IOException SCANNER_ALREADY_CLOSED = new IOException() { 3133 3134 private static final long serialVersionUID = -4305297078988180130L; 3135 3136 @Override 3137 public synchronized Throwable fillInStackTrace() { 3138 return this; 3139 } 3140 }; 3141 3142 private RegionScannerHolder getRegionScanner(ScanRequest request) throws IOException { 3143 String scannerName = toScannerName(request.getScannerId()); 3144 RegionScannerHolder rsh = this.scanners.get(scannerName); 3145 if (rsh == null) { 3146 // just ignore the next or close request if scanner does not exists. 3147 Long lastCallSeq = closedScanners.getIfPresent(scannerName); 3148 if (lastCallSeq != null) { 3149 // Check the sequence number to catch if the last call was incorrectly retried. 3150 // The only allowed scenario is when the scanner is exhausted and one more scan 3151 // request arrives - in this case returning 0 rows is correct. 3152 if (request.hasNextCallSeq() && request.getNextCallSeq() != lastCallSeq + 1) { 3153 throw new OutOfOrderScannerNextException("Expected nextCallSeq for closed request: " 3154 + (lastCallSeq + 1) + " But the nextCallSeq got from client: " 3155 + request.getNextCallSeq() + "; request=" + TextFormat.shortDebugString(request)); 3156 } else { 3157 throw SCANNER_ALREADY_CLOSED; 3158 } 3159 } else { 3160 LOG.warn("Client tried to access missing scanner " + scannerName); 3161 throw new UnknownScannerException( 3162 "Unknown scanner '" + scannerName + "'. This can happen due to any of the following " 3163 + "reasons: a) Scanner id given is wrong, b) Scanner lease expired because of " 3164 + "long wait between consecutive client checkins, c) Server may be closing down, " 3165 + "d) RegionServer restart during upgrade.\nIf the issue is due to reason (b), a " 3166 + "possible fix would be increasing the value of" 3167 + "'hbase.client.scanner.timeout.period' configuration."); 3168 } 3169 } 3170 rejectIfInStandByState(rsh.r); 3171 RegionInfo hri = rsh.s.getRegionInfo(); 3172 // Yes, should be the same instance 3173 if (server.getOnlineRegion(hri.getRegionName()) != rsh.r) { 3174 String msg = "Region has changed on the scanner " + scannerName + ": regionName=" 3175 + hri.getRegionNameAsString() + ", scannerRegionName=" + rsh.r; 3176 LOG.warn(msg + ", closing..."); 3177 scanners.remove(scannerName); 3178 try { 3179 rsh.s.close(); 3180 } catch (IOException e) { 3181 LOG.warn("Getting exception closing " + scannerName, e); 3182 } finally { 3183 try { 3184 server.getLeaseManager().cancelLease(scannerName); 3185 } catch (LeaseException e) { 3186 LOG.warn("Getting exception closing " + scannerName, e); 3187 } 3188 } 3189 throw new NotServingRegionException(msg); 3190 } 3191 return rsh; 3192 } 3193 3194 /** 3195 * @return Pair with scannerName key to use with this new Scanner and its RegionScannerHolder 3196 * value. 3197 */ 3198 private Pair<String, RegionScannerHolder> newRegionScanner(ScanRequest request, HRegion region, 3199 ScanResponse.Builder builder) throws IOException { 3200 rejectIfInStandByState(region); 3201 ClientProtos.Scan protoScan = request.getScan(); 3202 boolean isLoadingCfsOnDemandSet = protoScan.hasLoadColumnFamiliesOnDemand(); 3203 Scan scan = ProtobufUtil.toScan(protoScan); 3204 // if the request doesn't set this, get the default region setting. 3205 if (!isLoadingCfsOnDemandSet) { 3206 scan.setLoadColumnFamiliesOnDemand(region.isLoadingCfsOnDemandDefault()); 3207 } 3208 3209 if (!scan.hasFamilies()) { 3210 // Adding all families to scanner 3211 for (byte[] family : region.getTableDescriptor().getColumnFamilyNames()) { 3212 scan.addFamily(family); 3213 } 3214 } 3215 if (region.getCoprocessorHost() != null) { 3216 // preScannerOpen is not allowed to return a RegionScanner. Only post hook can create a 3217 // wrapper for the core created RegionScanner 3218 region.getCoprocessorHost().preScannerOpen(scan); 3219 } 3220 RegionScannerImpl coreScanner = region.getScanner(scan); 3221 Shipper shipper = coreScanner; 3222 RegionScanner scanner = coreScanner; 3223 try { 3224 if (region.getCoprocessorHost() != null) { 3225 scanner = region.getCoprocessorHost().postScannerOpen(scan, scanner); 3226 } 3227 } catch (Exception e) { 3228 // Although region coprocessor is for advanced users and they should take care of the 3229 // implementation to not damage the HBase system, closing the scanner on exception here does 3230 // not have any bad side effect, so let's do it 3231 scanner.close(); 3232 throw e; 3233 } 3234 long scannerId = scannerIdGenerator.generateNewScannerId(); 3235 builder.setScannerId(scannerId); 3236 builder.setMvccReadPoint(scanner.getMvccReadPoint()); 3237 builder.setTtl(scannerLeaseTimeoutPeriod); 3238 String scannerName = toScannerName(scannerId); 3239 3240 boolean fullRegionScan = 3241 !region.getRegionInfo().getTable().isSystemTable() && isFullRegionScan(scan, region); 3242 3243 return new Pair<String, RegionScannerHolder>(scannerName, 3244 addScanner(scannerName, scanner, shipper, region, scan.isNeedCursorResult(), fullRegionScan)); 3245 } 3246 3247 /** 3248 * The returned String is used as key doing look up of outstanding Scanners in this Servers' 3249 * this.scanners, the Map of outstanding scanners and their current state. 3250 * @param scannerId A scanner long id. 3251 * @return The long id as a String. 3252 */ 3253 private static String toScannerName(long scannerId) { 3254 return Long.toString(scannerId); 3255 } 3256 3257 private void checkScanNextCallSeq(ScanRequest request, RegionScannerHolder rsh) 3258 throws OutOfOrderScannerNextException { 3259 // if nextCallSeq does not match throw Exception straight away. This needs to be 3260 // performed even before checking of Lease. 3261 // See HBASE-5974 3262 if (request.hasNextCallSeq()) { 3263 long callSeq = request.getNextCallSeq(); 3264 if (!rsh.incNextCallSeq(callSeq)) { 3265 throw new OutOfOrderScannerNextException( 3266 "Expected nextCallSeq: " + rsh.getNextCallSeq() + " But the nextCallSeq got from client: " 3267 + request.getNextCallSeq() + "; request=" + TextFormat.shortDebugString(request) 3268 + "; region=" + rsh.r.getRegionInfo().getRegionNameAsString()); 3269 } 3270 } 3271 } 3272 3273 private void addScannerLeaseBack(LeaseManager.Lease lease) { 3274 try { 3275 server.getLeaseManager().addLease(lease); 3276 } catch (LeaseStillHeldException e) { 3277 // should not happen as the scanner id is unique. 3278 throw new AssertionError(e); 3279 } 3280 } 3281 3282 // visible for testing only 3283 long getTimeLimit(RpcCall rpcCall, HBaseRpcController controller, 3284 boolean allowHeartbeatMessages) { 3285 // Set the time limit to be half of the more restrictive timeout value (one of the 3286 // timeout values must be positive). In the event that both values are positive, the 3287 // more restrictive of the two is used to calculate the limit. 3288 if (allowHeartbeatMessages) { 3289 long now = EnvironmentEdgeManager.currentTime(); 3290 long remainingTimeout = getRemainingRpcTimeout(rpcCall, controller, now); 3291 if (scannerLeaseTimeoutPeriod > 0 || remainingTimeout > 0) { 3292 long timeLimitDelta; 3293 if (scannerLeaseTimeoutPeriod > 0 && remainingTimeout > 0) { 3294 timeLimitDelta = Math.min(scannerLeaseTimeoutPeriod, remainingTimeout); 3295 } else { 3296 timeLimitDelta = 3297 scannerLeaseTimeoutPeriod > 0 ? scannerLeaseTimeoutPeriod : remainingTimeout; 3298 } 3299 3300 // Use half of whichever timeout value was more restrictive... But don't allow 3301 // the time limit to be less than the allowable minimum (could cause an 3302 // immediate timeout before scanning any data). 3303 timeLimitDelta = Math.max(timeLimitDelta / 2, minimumScanTimeLimitDelta); 3304 return now + timeLimitDelta; 3305 } 3306 } 3307 // Default value of timeLimit is negative to indicate no timeLimit should be 3308 // enforced. 3309 return -1L; 3310 } 3311 3312 private long getRemainingRpcTimeout(RpcCall call, HBaseRpcController controller, long now) { 3313 long timeout; 3314 if (controller != null && controller.getCallTimeout() > 0) { 3315 timeout = controller.getCallTimeout(); 3316 } else if (rpcTimeout > 0) { 3317 timeout = rpcTimeout; 3318 } else { 3319 return -1; 3320 } 3321 if (call != null) { 3322 timeout -= (now - call.getReceiveTime()); 3323 } 3324 // getTimeLimit ignores values <= 0, but timeout may now be negative if queue time was high. 3325 // return minimum value here in that case so we count this in calculating the final delta. 3326 return Math.max(minimumScanTimeLimitDelta, timeout); 3327 } 3328 3329 private void checkLimitOfRows(int numOfCompleteRows, int limitOfRows, boolean moreRows, 3330 ScannerContext scannerContext, ScanResponse.Builder builder) { 3331 if (numOfCompleteRows >= limitOfRows) { 3332 if (LOG.isTraceEnabled()) { 3333 LOG.trace("Done scanning, limit of rows reached, moreRows: " + moreRows 3334 + " scannerContext: " + scannerContext); 3335 } 3336 builder.setMoreResults(false); 3337 } 3338 } 3339 3340 // return whether we have more results in region. 3341 private void scan(HBaseRpcController controller, ScanRequest request, RegionScannerHolder rsh, 3342 long maxQuotaResultSize, int maxResults, int limitOfRows, List<Result> results, 3343 ScanResponse.Builder builder, RpcCall rpcCall, ServerSideScanMetrics scanMetrics) 3344 throws IOException { 3345 HRegion region = rsh.r; 3346 RegionScanner scanner = rsh.s; 3347 long maxResultSize; 3348 if (scanner.getMaxResultSize() > 0) { 3349 maxResultSize = Math.min(scanner.getMaxResultSize(), maxQuotaResultSize); 3350 } else { 3351 maxResultSize = maxQuotaResultSize; 3352 } 3353 // This is cells inside a row. Default size is 10 so if many versions or many cfs, 3354 // then we'll resize. Resizings show in profiler. Set it higher than 10. For now 3355 // arbitrary 32. TODO: keep record of general size of results being returned. 3356 ArrayList<Cell> values = new ArrayList<>(32); 3357 region.startRegionOperation(Operation.SCAN); 3358 long before = EnvironmentEdgeManager.currentTime(); 3359 // Used to check if we've matched the row limit set on the Scan 3360 int numOfCompleteRows = 0; 3361 // Count of times we call nextRaw; can be > numOfCompleteRows. 3362 int numOfNextRawCalls = 0; 3363 try { 3364 int numOfResults = 0; 3365 synchronized (scanner) { 3366 boolean stale = (region.getRegionInfo().getReplicaId() != 0); 3367 boolean clientHandlesPartials = 3368 request.hasClientHandlesPartials() && request.getClientHandlesPartials(); 3369 boolean clientHandlesHeartbeats = 3370 request.hasClientHandlesHeartbeats() && request.getClientHandlesHeartbeats(); 3371 3372 // On the server side we must ensure that the correct ordering of partial results is 3373 // returned to the client to allow them to properly reconstruct the partial results. 3374 // If the coprocessor host is adding to the result list, we cannot guarantee the 3375 // correct ordering of partial results and so we prevent partial results from being 3376 // formed. 3377 boolean serverGuaranteesOrderOfPartials = results.isEmpty(); 3378 boolean allowPartialResults = clientHandlesPartials && serverGuaranteesOrderOfPartials; 3379 boolean moreRows = false; 3380 3381 // Heartbeat messages occur when the processing of the ScanRequest is exceeds a 3382 // certain time threshold on the server. When the time threshold is exceeded, the 3383 // server stops the scan and sends back whatever Results it has accumulated within 3384 // that time period (may be empty). Since heartbeat messages have the potential to 3385 // create partial Results (in the event that the timeout occurs in the middle of a 3386 // row), we must only generate heartbeat messages when the client can handle both 3387 // heartbeats AND partials 3388 boolean allowHeartbeatMessages = clientHandlesHeartbeats && allowPartialResults; 3389 3390 long timeLimit = getTimeLimit(rpcCall, controller, allowHeartbeatMessages); 3391 3392 final LimitScope sizeScope = 3393 allowPartialResults ? LimitScope.BETWEEN_CELLS : LimitScope.BETWEEN_ROWS; 3394 final LimitScope timeScope = 3395 allowHeartbeatMessages ? LimitScope.BETWEEN_CELLS : LimitScope.BETWEEN_ROWS; 3396 3397 // Configure with limits for this RPC. Set keep progress true since size progress 3398 // towards size limit should be kept between calls to nextRaw 3399 ScannerContext.Builder contextBuilder = ScannerContext.newBuilder(true); 3400 // maxResultSize - either we can reach this much size for all cells(being read) data or sum 3401 // of heap size occupied by cells(being read). Cell data means its key and value parts. 3402 // maxQuotaResultSize - max results just from server side configuration and quotas, without 3403 // user's specified max. We use this for evaluating limits based on blocks (not cells). 3404 // We may have accumulated some results in coprocessor preScannerNext call. Subtract any 3405 // cell or block size from maximum here so we adhere to total limits of request. 3406 // Note: we track block size in StoreScanner. If the CP hook got cells from hbase, it will 3407 // have accumulated block bytes. If not, this will be 0 for block size. 3408 long maxCellSize = maxResultSize; 3409 long maxBlockSize = maxQuotaResultSize; 3410 if (rpcCall != null) { 3411 maxBlockSize -= rpcCall.getBlockBytesScanned(); 3412 maxCellSize -= rpcCall.getResponseCellSize(); 3413 } 3414 3415 contextBuilder.setSizeLimit(sizeScope, maxCellSize, maxCellSize, maxBlockSize); 3416 contextBuilder.setBatchLimit(scanner.getBatch()); 3417 contextBuilder.setTimeLimit(timeScope, timeLimit); 3418 contextBuilder.setTrackMetrics(scanMetrics != null); 3419 contextBuilder.setScanMetrics(scanMetrics); 3420 ScannerContext scannerContext = contextBuilder.build(); 3421 boolean limitReached = false; 3422 long blockBytesScannedBefore = 0; 3423 while (numOfResults < maxResults) { 3424 // Reset the batch progress to 0 before every call to RegionScanner#nextRaw. The 3425 // batch limit is a limit on the number of cells per Result. Thus, if progress is 3426 // being tracked (i.e. scannerContext.keepProgress() is true) then we need to 3427 // reset the batch progress between nextRaw invocations since we don't want the 3428 // batch progress from previous calls to affect future calls 3429 scannerContext.setBatchProgress(0); 3430 assert values.isEmpty(); 3431 3432 // Collect values to be returned here 3433 moreRows = scanner.nextRaw(values, scannerContext); 3434 3435 long blockBytesScanned = scannerContext.getBlockSizeProgress() - blockBytesScannedBefore; 3436 blockBytesScannedBefore = scannerContext.getBlockSizeProgress(); 3437 3438 if (rpcCall == null) { 3439 // When there is no RpcCallContext,copy EC to heap, then the scanner would close, 3440 // This can be an EXPENSIVE call. It may make an extra copy from offheap to onheap 3441 // buffers.See more details in HBASE-26036. 3442 CellUtil.cloneIfNecessary(values); 3443 } 3444 numOfNextRawCalls++; 3445 3446 if (!values.isEmpty()) { 3447 if (limitOfRows > 0) { 3448 // First we need to check if the last result is partial and we have a row change. If 3449 // so then we need to increase the numOfCompleteRows. 3450 if (results.isEmpty()) { 3451 if ( 3452 rsh.rowOfLastPartialResult != null 3453 && !CellUtil.matchingRows(values.get(0), rsh.rowOfLastPartialResult) 3454 ) { 3455 numOfCompleteRows++; 3456 checkLimitOfRows(numOfCompleteRows, limitOfRows, moreRows, scannerContext, 3457 builder); 3458 } 3459 } else { 3460 Result lastResult = results.get(results.size() - 1); 3461 if ( 3462 lastResult.mayHaveMoreCellsInRow() 3463 && !CellUtil.matchingRows(values.get(0), lastResult.getRow()) 3464 ) { 3465 numOfCompleteRows++; 3466 checkLimitOfRows(numOfCompleteRows, limitOfRows, moreRows, scannerContext, 3467 builder); 3468 } 3469 } 3470 if (builder.hasMoreResults() && !builder.getMoreResults()) { 3471 break; 3472 } 3473 } 3474 boolean mayHaveMoreCellsInRow = scannerContext.mayHaveMoreCellsInRow(); 3475 Result r = Result.create(values, null, stale, mayHaveMoreCellsInRow); 3476 3477 if (request.getScan().getQueryMetricsEnabled()) { 3478 builder.addQueryMetrics(ClientProtos.QueryMetrics.newBuilder() 3479 .setBlockBytesScanned(blockBytesScanned).build()); 3480 } 3481 3482 results.add(r); 3483 numOfResults++; 3484 if (!mayHaveMoreCellsInRow && limitOfRows > 0) { 3485 numOfCompleteRows++; 3486 checkLimitOfRows(numOfCompleteRows, limitOfRows, moreRows, scannerContext, builder); 3487 if (builder.hasMoreResults() && !builder.getMoreResults()) { 3488 break; 3489 } 3490 } 3491 } else if (!moreRows && !results.isEmpty()) { 3492 // No more cells for the scan here, we need to ensure that the mayHaveMoreCellsInRow of 3493 // last result is false. Otherwise it's possible that: the first nextRaw returned 3494 // because BATCH_LIMIT_REACHED (BTW it happen to exhaust all cells of the scan),so the 3495 // last result's mayHaveMoreCellsInRow will be true. while the following nextRaw will 3496 // return with moreRows=false, which means moreResultsInRegion would be false, it will 3497 // be a contradictory state (HBASE-21206). 3498 int lastIdx = results.size() - 1; 3499 Result r = results.get(lastIdx); 3500 if (r.mayHaveMoreCellsInRow()) { 3501 results.set(lastIdx, ClientInternalHelper.createResult( 3502 ClientInternalHelper.getExtendedRawCells(r), r.getExists(), r.isStale(), false)); 3503 } 3504 } 3505 boolean sizeLimitReached = scannerContext.checkSizeLimit(LimitScope.BETWEEN_ROWS); 3506 boolean timeLimitReached = scannerContext.checkTimeLimit(LimitScope.BETWEEN_ROWS); 3507 boolean resultsLimitReached = numOfResults >= maxResults; 3508 limitReached = sizeLimitReached || timeLimitReached || resultsLimitReached; 3509 3510 if (limitReached || !moreRows) { 3511 // With block size limit, we may exceed size limit without collecting any results. 3512 // In this case we want to send heartbeat and/or cursor. We don't want to send heartbeat 3513 // or cursor if results were collected, for example for cell size or heap size limits. 3514 boolean sizeLimitReachedWithoutResults = sizeLimitReached && results.isEmpty(); 3515 // We only want to mark a ScanResponse as a heartbeat message in the event that 3516 // there are more values to be read server side. If there aren't more values, 3517 // marking it as a heartbeat is wasteful because the client will need to issue 3518 // another ScanRequest only to realize that they already have all the values 3519 if (moreRows && (timeLimitReached || sizeLimitReachedWithoutResults)) { 3520 // Heartbeat messages occur when the time limit has been reached, or size limit has 3521 // been reached before collecting any results. This can happen for heavily filtered 3522 // scans which scan over too many blocks. 3523 builder.setHeartbeatMessage(true); 3524 if (rsh.needCursor) { 3525 Cell cursorCell = scannerContext.getLastPeekedCell(); 3526 if (cursorCell != null) { 3527 builder.setCursor(ProtobufUtil.toCursor(cursorCell)); 3528 } 3529 } 3530 } 3531 break; 3532 } 3533 values.clear(); 3534 } 3535 if (rpcCall != null) { 3536 rpcCall.incrementResponseCellSize(scannerContext.getHeapSizeProgress()); 3537 } 3538 builder.setMoreResultsInRegion(moreRows); 3539 // Check to see if the client requested that we track metrics server side. If the 3540 // client requested metrics, retrieve the metrics from the scanner context. 3541 if (scanMetrics != null) { 3542 // rather than increment yet another counter in StoreScanner, just set the value here 3543 // from block size progress before writing into the response 3544 scanMetrics.setCounter(ServerSideScanMetrics.BLOCK_BYTES_SCANNED_KEY_METRIC_NAME, 3545 scannerContext.getBlockSizeProgress()); 3546 } 3547 } 3548 } finally { 3549 region.closeRegionOperation(); 3550 // Update serverside metrics, even on error. 3551 long end = EnvironmentEdgeManager.currentTime(); 3552 3553 long responseCellSize = 0; 3554 long blockBytesScanned = 0; 3555 if (rpcCall != null) { 3556 responseCellSize = rpcCall.getResponseCellSize(); 3557 blockBytesScanned = rpcCall.getBlockBytesScanned(); 3558 rsh.updateBlockBytesScanned(blockBytesScanned); 3559 } 3560 region.getMetrics().updateScan(); 3561 final MetricsRegionServer metricsRegionServer = server.getMetrics(); 3562 if (metricsRegionServer != null) { 3563 metricsRegionServer.updateScan(region, end - before, responseCellSize, blockBytesScanned); 3564 metricsRegionServer.updateReadQueryMeter(region, numOfNextRawCalls); 3565 } 3566 } 3567 // coprocessor postNext hook 3568 if (region.getCoprocessorHost() != null) { 3569 region.getCoprocessorHost().postScannerNext(scanner, results, maxResults, true); 3570 } 3571 } 3572 3573 /** 3574 * Scan data in a table. 3575 * @param controller the RPC controller 3576 * @param request the scan request 3577 */ 3578 @Override 3579 public ScanResponse scan(final RpcController controller, final ScanRequest request) 3580 throws ServiceException { 3581 if (controller != null && !(controller instanceof HBaseRpcController)) { 3582 throw new UnsupportedOperationException( 3583 "We only do " + "HBaseRpcControllers! FIX IF A PROBLEM: " + controller); 3584 } 3585 if (!request.hasScannerId() && !request.hasScan()) { 3586 throw new ServiceException( 3587 new DoNotRetryIOException("Missing required input: scannerId or scan")); 3588 } 3589 try { 3590 checkOpen(); 3591 } catch (IOException e) { 3592 if (request.hasScannerId()) { 3593 String scannerName = toScannerName(request.getScannerId()); 3594 if (LOG.isDebugEnabled()) { 3595 LOG.debug( 3596 "Server shutting down and client tried to access missing scanner " + scannerName); 3597 } 3598 final LeaseManager leaseManager = server.getLeaseManager(); 3599 if (leaseManager != null) { 3600 try { 3601 leaseManager.cancelLease(scannerName); 3602 } catch (LeaseException le) { 3603 // No problem, ignore 3604 if (LOG.isTraceEnabled()) { 3605 LOG.trace("Un-able to cancel lease of scanner. It could already be closed."); 3606 } 3607 } 3608 } 3609 } 3610 throw new ServiceException(e); 3611 } 3612 boolean trackMetrics = request.hasTrackScanMetrics() && request.getTrackScanMetrics(); 3613 ThreadLocalServerSideScanMetrics.setScanMetricsEnabled(trackMetrics); 3614 if (trackMetrics) { 3615 ThreadLocalServerSideScanMetrics.reset(); 3616 } 3617 requestCount.increment(); 3618 rpcScanRequestCount.increment(); 3619 RegionScannerContext rsx; 3620 ScanResponse.Builder builder = ScanResponse.newBuilder(); 3621 try { 3622 rsx = checkQuotaAndGetRegionScannerContext(request, builder); 3623 } catch (IOException e) { 3624 if (e == SCANNER_ALREADY_CLOSED) { 3625 // Now we will close scanner automatically if there are no more results for this region but 3626 // the old client will still send a close request to us. Just ignore it and return. 3627 return builder.build(); 3628 } 3629 throw new ServiceException(e); 3630 } 3631 String scannerName = rsx.scannerName; 3632 RegionScannerHolder rsh = rsx.holder; 3633 OperationQuota quota = rsx.quota; 3634 if (rsh.fullRegionScan) { 3635 rpcFullScanRequestCount.increment(); 3636 } 3637 HRegion region = rsh.r; 3638 LeaseManager.Lease lease; 3639 try { 3640 // Remove lease while its being processed in server; protects against case 3641 // where processing of request takes > lease expiration time. or null if none found. 3642 lease = server.getLeaseManager().removeLease(scannerName); 3643 } catch (LeaseException e) { 3644 throw new ServiceException(e); 3645 } 3646 if (request.hasRenew() && request.getRenew()) { 3647 // add back and return 3648 addScannerLeaseBack(lease); 3649 try { 3650 checkScanNextCallSeq(request, rsh); 3651 } catch (OutOfOrderScannerNextException e) { 3652 throw new ServiceException(e); 3653 } 3654 return builder.build(); 3655 } 3656 try { 3657 checkScanNextCallSeq(request, rsh); 3658 } catch (OutOfOrderScannerNextException e) { 3659 addScannerLeaseBack(lease); 3660 throw new ServiceException(e); 3661 } 3662 // Now we have increased the next call sequence. If we give client an error, the retry will 3663 // never success. So we'd better close the scanner and return a DoNotRetryIOException to client 3664 // and then client will try to open a new scanner. 3665 boolean closeScanner = request.hasCloseScanner() ? request.getCloseScanner() : false; 3666 int rows; // this is scan.getCaching 3667 if (request.hasNumberOfRows()) { 3668 rows = request.getNumberOfRows(); 3669 } else { 3670 rows = closeScanner ? 0 : 1; 3671 } 3672 RpcCall rpcCall = RpcServer.getCurrentCall().orElse(null); 3673 // now let's do the real scan. 3674 long maxQuotaResultSize = Math.min(maxScannerResultSize, quota.getMaxResultSize()); 3675 RegionScanner scanner = rsh.s; 3676 // this is the limit of rows for this scan, if we the number of rows reach this value, we will 3677 // close the scanner. 3678 int limitOfRows; 3679 if (request.hasLimitOfRows()) { 3680 limitOfRows = request.getLimitOfRows(); 3681 } else { 3682 limitOfRows = -1; 3683 } 3684 boolean scannerClosed = false; 3685 try { 3686 List<Result> results = new ArrayList<>(Math.min(rows, 512)); 3687 ServerSideScanMetrics scanMetrics = trackMetrics ? new ServerSideScanMetrics() : null; 3688 if (rows > 0) { 3689 boolean done = false; 3690 // Call coprocessor. Get region info from scanner. 3691 if (region.getCoprocessorHost() != null) { 3692 Boolean bypass = region.getCoprocessorHost().preScannerNext(scanner, results, rows); 3693 if (!results.isEmpty()) { 3694 for (Result r : results) { 3695 // add cell size from CP results so we can track response size and update limits 3696 // when calling scan below if !done. We'll also have tracked block size if the CP 3697 // got results from hbase, since StoreScanner tracks that for all calls automatically. 3698 addSize(rpcCall, r); 3699 } 3700 } 3701 if (bypass != null && bypass.booleanValue()) { 3702 done = true; 3703 } 3704 } 3705 if (!done) { 3706 scan((HBaseRpcController) controller, request, rsh, maxQuotaResultSize, rows, limitOfRows, 3707 results, builder, rpcCall, scanMetrics); 3708 } else { 3709 builder.setMoreResultsInRegion(!results.isEmpty()); 3710 } 3711 } else { 3712 // This is a open scanner call with numberOfRow = 0, so set more results in region to true. 3713 builder.setMoreResultsInRegion(true); 3714 } 3715 3716 quota.addScanResult(results); 3717 addResults(builder, results, (HBaseRpcController) controller, 3718 RegionReplicaUtil.isDefaultReplica(region.getRegionInfo()), 3719 isClientCellBlockSupport(rpcCall)); 3720 if (scanner.isFilterDone() && results.isEmpty()) { 3721 // If the scanner's filter - if any - is done with the scan 3722 // only set moreResults to false if the results is empty. This is used to keep compatible 3723 // with the old scan implementation where we just ignore the returned results if moreResults 3724 // is false. Can remove the isEmpty check after we get rid of the old implementation. 3725 builder.setMoreResults(false); 3726 } 3727 // Later we may close the scanner depending on this flag so here we need to make sure that we 3728 // have already set this flag. 3729 assert builder.hasMoreResultsInRegion(); 3730 // we only set moreResults to false in the above code, so set it to true if we haven't set it 3731 // yet. 3732 if (!builder.hasMoreResults()) { 3733 builder.setMoreResults(true); 3734 } 3735 if (builder.getMoreResults() && builder.getMoreResultsInRegion() && !results.isEmpty()) { 3736 // Record the last cell of the last result if it is a partial result 3737 // We need this to calculate the complete rows we have returned to client as the 3738 // mayHaveMoreCellsInRow is true does not mean that there will be extra cells for the 3739 // current row. We may filter out all the remaining cells for the current row and just 3740 // return the cells of the nextRow when calling RegionScanner.nextRaw. So here we need to 3741 // check for row change. 3742 Result lastResult = results.get(results.size() - 1); 3743 if (lastResult.mayHaveMoreCellsInRow()) { 3744 rsh.rowOfLastPartialResult = lastResult.getRow(); 3745 } else { 3746 rsh.rowOfLastPartialResult = null; 3747 } 3748 } 3749 if (!builder.getMoreResults() || !builder.getMoreResultsInRegion() || closeScanner) { 3750 scannerClosed = true; 3751 closeScanner(region, scanner, scannerName, rpcCall, false); 3752 } 3753 3754 // There's no point returning to a timed out client. Throwing ensures scanner is closed 3755 if (rpcCall != null && EnvironmentEdgeManager.currentTime() > rpcCall.getDeadline()) { 3756 throw new TimeoutIOException("Client deadline exceeded, cannot return results"); 3757 } 3758 3759 if (scanMetrics != null) { 3760 if (rpcCall != null) { 3761 long rpcScanTime = EnvironmentEdgeManager.currentTime() - rpcCall.getStartTime(); 3762 long rpcQueueWaitTime = rpcCall.getStartTime() - rpcCall.getReceiveTime(); 3763 scanMetrics.addToCounter(ServerSideScanMetrics.RPC_SCAN_PROCESSING_TIME_METRIC_NAME, 3764 rpcScanTime); 3765 scanMetrics.addToCounter(ServerSideScanMetrics.RPC_SCAN_QUEUE_WAIT_TIME_METRIC_NAME, 3766 rpcQueueWaitTime); 3767 } 3768 ThreadLocalServerSideScanMetrics.populateServerSideScanMetrics(scanMetrics); 3769 Map<String, Long> metrics = scanMetrics.getMetricsMap(); 3770 ScanMetrics.Builder metricBuilder = ScanMetrics.newBuilder(); 3771 NameInt64Pair.Builder pairBuilder = NameInt64Pair.newBuilder(); 3772 3773 for (Entry<String, Long> entry : metrics.entrySet()) { 3774 pairBuilder.setName(entry.getKey()); 3775 pairBuilder.setValue(entry.getValue()); 3776 metricBuilder.addMetrics(pairBuilder.build()); 3777 } 3778 3779 builder.setScanMetrics(metricBuilder.build()); 3780 } 3781 3782 return builder.build(); 3783 } catch (IOException e) { 3784 try { 3785 // scanner is closed here 3786 scannerClosed = true; 3787 // The scanner state might be left in a dirty state, so we will tell the Client to 3788 // fail this RPC and close the scanner while opening up another one from the start of 3789 // row that the client has last seen. 3790 closeScanner(region, scanner, scannerName, rpcCall, true); 3791 3792 // If it is a DoNotRetryIOException already, throw as it is. Unfortunately, DNRIOE is 3793 // used in two different semantics. 3794 // (1) The first is to close the client scanner and bubble up the exception all the way 3795 // to the application. This is preferred when the exception is really un-recoverable 3796 // (like CorruptHFileException, etc). Plain DoNotRetryIOException also falls into this 3797 // bucket usually. 3798 // (2) Second semantics is to close the current region scanner only, but continue the 3799 // client scanner by overriding the exception. This is usually UnknownScannerException, 3800 // OutOfOrderScannerNextException, etc where the region scanner has to be closed, but the 3801 // application-level ClientScanner has to continue without bubbling up the exception to 3802 // the client. See ClientScanner code to see how it deals with these special exceptions. 3803 if (e instanceof DoNotRetryIOException) { 3804 throw e; 3805 } 3806 3807 // If it is a FileNotFoundException, wrap as a 3808 // DoNotRetryIOException. This can avoid the retry in ClientScanner. 3809 if (e instanceof FileNotFoundException) { 3810 throw new DoNotRetryIOException(e); 3811 } 3812 3813 // We closed the scanner already. Instead of throwing the IOException, and client 3814 // retrying with the same scannerId only to get USE on the next RPC, we directly throw 3815 // a special exception to save an RPC. 3816 if (VersionInfoUtil.hasMinimumVersion(rpcCall.getClientVersionInfo(), 1, 4)) { 3817 // 1.4.0+ clients know how to handle 3818 throw new ScannerResetException("Scanner is closed on the server-side", e); 3819 } else { 3820 // older clients do not know about SRE. Just throw USE, which they will handle 3821 throw new UnknownScannerException("Throwing UnknownScannerException to reset the client" 3822 + " scanner state for clients older than 1.3.", e); 3823 } 3824 } catch (IOException ioe) { 3825 throw new ServiceException(ioe); 3826 } 3827 } finally { 3828 if (!scannerClosed) { 3829 // Adding resets expiration time on lease. 3830 // the closeCallBack will be set in closeScanner so here we only care about shippedCallback 3831 if (rpcCall != null) { 3832 rpcCall.setCallBack(rsh.shippedCallback); 3833 } else { 3834 // If context is null,here we call rsh.shippedCallback directly to reuse the logic in 3835 // rsh.shippedCallback to release the internal resources in rsh,and lease is also added 3836 // back to regionserver's LeaseManager in rsh.shippedCallback. 3837 runShippedCallback(rsh); 3838 } 3839 } 3840 quota.close(); 3841 } 3842 } 3843 3844 private void runShippedCallback(RegionScannerHolder rsh) throws ServiceException { 3845 assert rsh.shippedCallback != null; 3846 try { 3847 rsh.shippedCallback.run(); 3848 } catch (IOException ioe) { 3849 throw new ServiceException(ioe); 3850 } 3851 } 3852 3853 private void closeScanner(HRegion region, RegionScanner scanner, String scannerName, 3854 RpcCallContext context, boolean isError) throws IOException { 3855 if (region.getCoprocessorHost() != null) { 3856 if (region.getCoprocessorHost().preScannerClose(scanner)) { 3857 // bypass the actual close. 3858 return; 3859 } 3860 } 3861 RegionScannerHolder rsh = scanners.remove(scannerName); 3862 if (rsh != null) { 3863 if (context != null) { 3864 context.setCallBack(rsh.closeCallBack); 3865 } else { 3866 rsh.s.close(); 3867 } 3868 if (region.getCoprocessorHost() != null) { 3869 region.getCoprocessorHost().postScannerClose(scanner); 3870 } 3871 if (!isError) { 3872 closedScanners.put(scannerName, rsh.getNextCallSeq()); 3873 } 3874 } 3875 } 3876 3877 @Override 3878 public CoprocessorServiceResponse execRegionServerService(RpcController controller, 3879 CoprocessorServiceRequest request) throws ServiceException { 3880 rpcPreCheck("execRegionServerService"); 3881 return server.execRegionServerService(controller, request); 3882 } 3883 3884 @Override 3885 public GetSpaceQuotaSnapshotsResponse getSpaceQuotaSnapshots(RpcController controller, 3886 GetSpaceQuotaSnapshotsRequest request) throws ServiceException { 3887 try { 3888 final RegionServerSpaceQuotaManager manager = server.getRegionServerSpaceQuotaManager(); 3889 final GetSpaceQuotaSnapshotsResponse.Builder builder = 3890 GetSpaceQuotaSnapshotsResponse.newBuilder(); 3891 if (manager != null) { 3892 final Map<TableName, SpaceQuotaSnapshot> snapshots = manager.copyQuotaSnapshots(); 3893 for (Entry<TableName, SpaceQuotaSnapshot> snapshot : snapshots.entrySet()) { 3894 builder.addSnapshots(TableQuotaSnapshot.newBuilder() 3895 .setTableName(ProtobufUtil.toProtoTableName(snapshot.getKey())) 3896 .setSnapshot(SpaceQuotaSnapshot.toProtoSnapshot(snapshot.getValue())).build()); 3897 } 3898 } 3899 return builder.build(); 3900 } catch (Exception e) { 3901 throw new ServiceException(e); 3902 } 3903 } 3904 3905 @Override 3906 public ClearRegionBlockCacheResponse clearRegionBlockCache(RpcController controller, 3907 ClearRegionBlockCacheRequest request) throws ServiceException { 3908 try { 3909 rpcPreCheck("clearRegionBlockCache"); 3910 ClearRegionBlockCacheResponse.Builder builder = ClearRegionBlockCacheResponse.newBuilder(); 3911 CacheEvictionStatsBuilder stats = CacheEvictionStats.builder(); 3912 server.getRegionServerCoprocessorHost().preClearRegionBlockCache(); 3913 List<HRegion> regions = getRegions(request.getRegionList(), stats); 3914 for (HRegion region : regions) { 3915 try { 3916 stats = stats.append(this.server.clearRegionBlockCache(region)); 3917 } catch (Exception e) { 3918 stats.addException(region.getRegionInfo().getRegionName(), e); 3919 } 3920 } 3921 stats.withMaxCacheSize(server.getBlockCache().map(BlockCache::getMaxSize).orElse(0L)); 3922 server.getRegionServerCoprocessorHost().postClearRegionBlockCache(stats.build()); 3923 return builder.setStats(ProtobufUtil.toCacheEvictionStats(stats.build())).build(); 3924 } catch (IOException e) { 3925 throw new ServiceException(e); 3926 } 3927 } 3928 3929 private void executeOpenRegionProcedures(OpenRegionRequest request, 3930 Map<TableName, TableDescriptor> tdCache) { 3931 long masterSystemTime = request.hasMasterSystemTime() ? request.getMasterSystemTime() : -1; 3932 long initiatingMasterActiveTime = 3933 request.hasInitiatingMasterActiveTime() ? request.getInitiatingMasterActiveTime() : -1; 3934 for (RegionOpenInfo regionOpenInfo : request.getOpenInfoList()) { 3935 RegionInfo regionInfo = ProtobufUtil.toRegionInfo(regionOpenInfo.getRegion()); 3936 TableName tableName = regionInfo.getTable(); 3937 TableDescriptor tableDesc = tdCache.get(tableName); 3938 if (tableDesc == null) { 3939 try { 3940 tableDesc = server.getTableDescriptors().get(regionInfo.getTable()); 3941 } catch (IOException e) { 3942 // Here we do not fail the whole method since we also need deal with other 3943 // procedures, and we can not ignore this one, so we still schedule a 3944 // AssignRegionHandler and it will report back to master if we still can not get the 3945 // TableDescriptor. 3946 LOG.warn("Failed to get TableDescriptor of {}, will try again in the handler", 3947 regionInfo.getTable(), e); 3948 } 3949 if (tableDesc != null) { 3950 tdCache.put(tableName, tableDesc); 3951 } 3952 } 3953 if (regionOpenInfo.getFavoredNodesCount() > 0) { 3954 server.updateRegionFavoredNodesMapping(regionInfo.getEncodedName(), 3955 regionOpenInfo.getFavoredNodesList()); 3956 } 3957 long procId = regionOpenInfo.getOpenProcId(); 3958 if (server.submitRegionProcedure(procId)) { 3959 server.getExecutorService().submit(AssignRegionHandler.create(server, regionInfo, procId, 3960 tableDesc, masterSystemTime, initiatingMasterActiveTime)); 3961 } 3962 } 3963 } 3964 3965 private void executeCloseRegionProcedures(CloseRegionRequest request) { 3966 String encodedName; 3967 long initiatingMasterActiveTime = 3968 request.hasInitiatingMasterActiveTime() ? request.getInitiatingMasterActiveTime() : -1; 3969 try { 3970 encodedName = ProtobufUtil.getRegionEncodedName(request.getRegion()); 3971 } catch (DoNotRetryIOException e) { 3972 throw new UncheckedIOException("Should not happen", e); 3973 } 3974 ServerName destination = request.hasDestinationServer() 3975 ? ProtobufUtil.toServerName(request.getDestinationServer()) 3976 : null; 3977 long procId = request.getCloseProcId(); 3978 boolean evictCache = request.getEvictCache(); 3979 if (server.submitRegionProcedure(procId)) { 3980 server.getExecutorService().submit(UnassignRegionHandler.create(server, encodedName, procId, 3981 false, destination, evictCache, initiatingMasterActiveTime)); 3982 } 3983 } 3984 3985 private void executeProcedures(RemoteProcedureRequest request) { 3986 RSProcedureCallable callable; 3987 try { 3988 callable = Class.forName(request.getProcClass()).asSubclass(RSProcedureCallable.class) 3989 .getDeclaredConstructor().newInstance(); 3990 } catch (Exception e) { 3991 LOG.warn("Failed to instantiating remote procedure {}, pid={}", request.getProcClass(), 3992 request.getProcId(), e); 3993 server.remoteProcedureComplete(request.getProcId(), request.getInitiatingMasterActiveTime(), 3994 e, null); 3995 return; 3996 } 3997 callable.init(request.getProcData().toByteArray(), server); 3998 LOG.debug("Executing remote procedure {}, pid={}", callable.getClass(), request.getProcId()); 3999 server.executeProcedure(request.getProcId(), request.getInitiatingMasterActiveTime(), callable); 4000 } 4001 4002 @Override 4003 @QosPriority(priority = HConstants.ADMIN_QOS) 4004 public ExecuteProceduresResponse executeProcedures(RpcController controller, 4005 ExecuteProceduresRequest request) throws ServiceException { 4006 try { 4007 checkOpen(); 4008 throwOnWrongStartCode(request); 4009 server.getRegionServerCoprocessorHost().preExecuteProcedures(); 4010 if (request.getOpenRegionCount() > 0) { 4011 // Avoid reading from the TableDescritor every time(usually it will read from the file 4012 // system) 4013 Map<TableName, TableDescriptor> tdCache = new HashMap<>(); 4014 request.getOpenRegionList().forEach(req -> executeOpenRegionProcedures(req, tdCache)); 4015 } 4016 if (request.getCloseRegionCount() > 0) { 4017 request.getCloseRegionList().forEach(this::executeCloseRegionProcedures); 4018 } 4019 if (request.getProcCount() > 0) { 4020 request.getProcList().forEach(this::executeProcedures); 4021 } 4022 server.getRegionServerCoprocessorHost().postExecuteProcedures(); 4023 return ExecuteProceduresResponse.getDefaultInstance(); 4024 } catch (IOException e) { 4025 throw new ServiceException(e); 4026 } 4027 } 4028 4029 @Override 4030 public GetAllBootstrapNodesResponse getAllBootstrapNodes(RpcController controller, 4031 GetAllBootstrapNodesRequest request) throws ServiceException { 4032 GetAllBootstrapNodesResponse.Builder builder = GetAllBootstrapNodesResponse.newBuilder(); 4033 server.getBootstrapNodes() 4034 .forEachRemaining(server -> builder.addNode(ProtobufUtil.toServerName(server))); 4035 return builder.build(); 4036 } 4037 4038 private void setReloadableGuardrails(Configuration conf) { 4039 rowSizeWarnThreshold = 4040 conf.getInt(HConstants.BATCH_ROWS_THRESHOLD_NAME, HConstants.BATCH_ROWS_THRESHOLD_DEFAULT); 4041 rejectRowsWithSizeOverThreshold = 4042 conf.getBoolean(REJECT_BATCH_ROWS_OVER_THRESHOLD, DEFAULT_REJECT_BATCH_ROWS_OVER_THRESHOLD); 4043 maxScannerResultSize = conf.getLong(HConstants.HBASE_SERVER_SCANNER_MAX_RESULT_SIZE_KEY, 4044 HConstants.DEFAULT_HBASE_SERVER_SCANNER_MAX_RESULT_SIZE); 4045 } 4046 4047 @Override 4048 public void onConfigurationChange(Configuration conf) { 4049 super.onConfigurationChange(conf); 4050 setReloadableGuardrails(conf); 4051 } 4052 4053 @Override 4054 public GetCachedFilesListResponse getCachedFilesList(RpcController controller, 4055 GetCachedFilesListRequest request) throws ServiceException { 4056 GetCachedFilesListResponse.Builder responseBuilder = GetCachedFilesListResponse.newBuilder(); 4057 List<String> fullyCachedFiles = new ArrayList<>(); 4058 server.getBlockCache().flatMap(BlockCache::getFullyCachedFiles).ifPresent(fcf -> { 4059 fullyCachedFiles.addAll(fcf.keySet()); 4060 }); 4061 return responseBuilder.addAllCachedFiles(fullyCachedFiles).build(); 4062 } 4063 4064 /** 4065 * STUB - Refreshes the system key cache on the region server. Feature not yet implemented in 4066 * precursor PR. 4067 */ 4068 @Override 4069 @QosPriority(priority = HConstants.ADMIN_QOS) 4070 public EmptyMsg refreshSystemKeyCache(final RpcController controller, final EmptyMsg request) 4071 throws ServiceException { 4072 throw new ServiceException( 4073 new UnsupportedOperationException("Key management feature not yet implemented")); 4074 } 4075 4076 /** 4077 * STUB - Ejects a specific managed key entry from the cache. Feature not yet implemented in 4078 * precursor PR. 4079 */ 4080 @Override 4081 @QosPriority(priority = HConstants.ADMIN_QOS) 4082 public BooleanMsg ejectManagedKeyDataCacheEntry(final RpcController controller, 4083 final ManagedKeyEntryRequest request) throws ServiceException { 4084 throw new ServiceException( 4085 new UnsupportedOperationException("Key management feature not yet implemented")); 4086 } 4087 4088 /** 4089 * STUB - Clears all entries in the managed key data cache. Feature not yet implemented in 4090 * precursor PR. 4091 */ 4092 @Override 4093 @QosPriority(priority = HConstants.ADMIN_QOS) 4094 public EmptyMsg clearManagedKeyDataCache(final RpcController controller, final EmptyMsg request) 4095 throws ServiceException { 4096 throw new ServiceException( 4097 new UnsupportedOperationException("Key management feature not yet implemented")); 4098 } 4099 4100 RegionScannerContext checkQuotaAndGetRegionScannerContext(ScanRequest request, 4101 ScanResponse.Builder builder) throws IOException { 4102 if (request.hasScannerId()) { 4103 // The downstream projects such as AsyncHBase in OpenTSDB need this value. See HBASE-18000 4104 // for more details. 4105 long scannerId = request.getScannerId(); 4106 builder.setScannerId(scannerId); 4107 String scannerName = toScannerName(scannerId); 4108 RegionScannerHolder rsh = getRegionScanner(request); 4109 OperationQuota quota = 4110 getRpcQuotaManager().checkScanQuota(rsh.r, request, maxScannerResultSize, 4111 rsh.getMaxBlockBytesScanned(), rsh.getPrevBlockBytesScannedDifference()); 4112 return new RegionScannerContext(scannerName, rsh, quota); 4113 } 4114 4115 HRegion region = getRegion(request.getRegion()); 4116 OperationQuota quota = 4117 getRpcQuotaManager().checkScanQuota(region, request, maxScannerResultSize, 0L, 0L); 4118 Pair<String, RegionScannerHolder> pair = newRegionScanner(request, region, builder); 4119 return new RegionScannerContext(pair.getFirst(), pair.getSecond(), quota); 4120 } 4121}