001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019 020import com.google.errorprone.annotations.RestrictedApi; 021import java.io.FileNotFoundException; 022import java.io.IOException; 023import java.io.UncheckedIOException; 024import java.lang.reflect.InvocationTargetException; 025import java.lang.reflect.Method; 026import java.net.BindException; 027import java.net.InetAddress; 028import java.net.InetSocketAddress; 029import java.nio.ByteBuffer; 030import java.util.ArrayList; 031import java.util.Arrays; 032import java.util.Collections; 033import java.util.HashMap; 034import java.util.Iterator; 035import java.util.List; 036import java.util.Map; 037import java.util.Map.Entry; 038import java.util.NavigableMap; 039import java.util.Optional; 040import java.util.Set; 041import java.util.TreeSet; 042import java.util.concurrent.ConcurrentHashMap; 043import java.util.concurrent.ConcurrentMap; 044import java.util.concurrent.TimeUnit; 045import java.util.concurrent.atomic.AtomicBoolean; 046import java.util.concurrent.atomic.AtomicLong; 047import java.util.concurrent.atomic.LongAdder; 048import org.apache.commons.lang3.mutable.MutableObject; 049import org.apache.hadoop.conf.Configuration; 050import org.apache.hadoop.fs.Path; 051import org.apache.hadoop.hbase.ByteBufferExtendedCell; 052import org.apache.hadoop.hbase.CacheEvictionStats; 053import org.apache.hadoop.hbase.CacheEvictionStatsBuilder; 054import org.apache.hadoop.hbase.Cell; 055import org.apache.hadoop.hbase.CellScannable; 056import org.apache.hadoop.hbase.CellScanner; 057import org.apache.hadoop.hbase.CellUtil; 058import org.apache.hadoop.hbase.DoNotRetryIOException; 059import org.apache.hadoop.hbase.DroppedSnapshotException; 060import org.apache.hadoop.hbase.HBaseIOException; 061import org.apache.hadoop.hbase.HConstants; 062import org.apache.hadoop.hbase.HRegionLocation; 063import org.apache.hadoop.hbase.MultiActionResultTooLarge; 064import org.apache.hadoop.hbase.NotServingRegionException; 065import org.apache.hadoop.hbase.PrivateCellUtil; 066import org.apache.hadoop.hbase.RegionTooBusyException; 067import org.apache.hadoop.hbase.Server; 068import org.apache.hadoop.hbase.ServerName; 069import org.apache.hadoop.hbase.TableName; 070import org.apache.hadoop.hbase.UnknownScannerException; 071import org.apache.hadoop.hbase.client.Append; 072import org.apache.hadoop.hbase.client.CheckAndMutate; 073import org.apache.hadoop.hbase.client.CheckAndMutateResult; 074import org.apache.hadoop.hbase.client.ConnectionUtils; 075import org.apache.hadoop.hbase.client.Delete; 076import org.apache.hadoop.hbase.client.Durability; 077import org.apache.hadoop.hbase.client.Get; 078import org.apache.hadoop.hbase.client.Increment; 079import org.apache.hadoop.hbase.client.Mutation; 080import org.apache.hadoop.hbase.client.OperationWithAttributes; 081import org.apache.hadoop.hbase.client.Put; 082import org.apache.hadoop.hbase.client.RegionInfo; 083import org.apache.hadoop.hbase.client.RegionReplicaUtil; 084import org.apache.hadoop.hbase.client.Result; 085import org.apache.hadoop.hbase.client.Row; 086import org.apache.hadoop.hbase.client.Scan; 087import org.apache.hadoop.hbase.client.TableDescriptor; 088import org.apache.hadoop.hbase.client.VersionInfoUtil; 089import org.apache.hadoop.hbase.conf.ConfigurationObserver; 090import org.apache.hadoop.hbase.exceptions.FailedSanityCheckException; 091import org.apache.hadoop.hbase.exceptions.OutOfOrderScannerNextException; 092import org.apache.hadoop.hbase.exceptions.ScannerResetException; 093import org.apache.hadoop.hbase.exceptions.TimeoutIOException; 094import org.apache.hadoop.hbase.exceptions.UnknownProtocolException; 095import org.apache.hadoop.hbase.io.ByteBuffAllocator; 096import org.apache.hadoop.hbase.io.hfile.BlockCache; 097import org.apache.hadoop.hbase.ipc.HBaseRPCErrorHandler; 098import org.apache.hadoop.hbase.ipc.HBaseRpcController; 099import org.apache.hadoop.hbase.ipc.PriorityFunction; 100import org.apache.hadoop.hbase.ipc.QosPriority; 101import org.apache.hadoop.hbase.ipc.RpcCall; 102import org.apache.hadoop.hbase.ipc.RpcCallContext; 103import org.apache.hadoop.hbase.ipc.RpcCallback; 104import org.apache.hadoop.hbase.ipc.RpcScheduler; 105import org.apache.hadoop.hbase.ipc.RpcServer; 106import org.apache.hadoop.hbase.ipc.RpcServer.BlockingServiceAndInterface; 107import org.apache.hadoop.hbase.ipc.RpcServerFactory; 108import org.apache.hadoop.hbase.ipc.RpcServerInterface; 109import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException; 110import org.apache.hadoop.hbase.ipc.ServerRpcController; 111import org.apache.hadoop.hbase.log.HBaseMarkers; 112import org.apache.hadoop.hbase.master.HMaster; 113import org.apache.hadoop.hbase.master.MasterRpcServices; 114import org.apache.hadoop.hbase.namequeues.NamedQueuePayload; 115import org.apache.hadoop.hbase.namequeues.NamedQueueRecorder; 116import org.apache.hadoop.hbase.namequeues.RpcLogDetails; 117import org.apache.hadoop.hbase.namequeues.request.NamedQueueGetRequest; 118import org.apache.hadoop.hbase.namequeues.response.NamedQueueGetResponse; 119import org.apache.hadoop.hbase.net.Address; 120import org.apache.hadoop.hbase.procedure2.RSProcedureCallable; 121import org.apache.hadoop.hbase.quotas.ActivePolicyEnforcement; 122import org.apache.hadoop.hbase.quotas.OperationQuota; 123import org.apache.hadoop.hbase.quotas.QuotaUtil; 124import org.apache.hadoop.hbase.quotas.RegionServerRpcQuotaManager; 125import org.apache.hadoop.hbase.quotas.RegionServerSpaceQuotaManager; 126import org.apache.hadoop.hbase.quotas.SpaceQuotaSnapshot; 127import org.apache.hadoop.hbase.quotas.SpaceViolationPolicyEnforcement; 128import org.apache.hadoop.hbase.regionserver.LeaseManager.Lease; 129import org.apache.hadoop.hbase.regionserver.LeaseManager.LeaseStillHeldException; 130import org.apache.hadoop.hbase.regionserver.Region.Operation; 131import org.apache.hadoop.hbase.regionserver.ScannerContext.LimitScope; 132import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker; 133import org.apache.hadoop.hbase.regionserver.handler.AssignRegionHandler; 134import org.apache.hadoop.hbase.regionserver.handler.OpenMetaHandler; 135import org.apache.hadoop.hbase.regionserver.handler.OpenPriorityRegionHandler; 136import org.apache.hadoop.hbase.regionserver.handler.OpenRegionHandler; 137import org.apache.hadoop.hbase.regionserver.handler.UnassignRegionHandler; 138import org.apache.hadoop.hbase.security.Superusers; 139import org.apache.hadoop.hbase.security.User; 140import org.apache.hadoop.hbase.security.access.AccessChecker; 141import org.apache.hadoop.hbase.security.access.NoopAccessChecker; 142import org.apache.hadoop.hbase.security.access.Permission; 143import org.apache.hadoop.hbase.security.access.ZKPermissionWatcher; 144import org.apache.hadoop.hbase.util.Bytes; 145import org.apache.hadoop.hbase.util.DNS; 146import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 147import org.apache.hadoop.hbase.util.Pair; 148import org.apache.hadoop.hbase.util.ReservoirSample; 149import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil; 150import org.apache.hadoop.hbase.wal.WAL; 151import org.apache.hadoop.hbase.wal.WALEdit; 152import org.apache.hadoop.hbase.wal.WALKey; 153import org.apache.hadoop.hbase.wal.WALSplitUtil; 154import org.apache.hadoop.hbase.wal.WALSplitUtil.MutationReplay; 155import org.apache.hadoop.hbase.zookeeper.ZKWatcher; 156import org.apache.yetus.audience.InterfaceAudience; 157import org.apache.zookeeper.KeeperException; 158import org.slf4j.Logger; 159import org.slf4j.LoggerFactory; 160 161import org.apache.hbase.thirdparty.com.google.common.cache.Cache; 162import org.apache.hbase.thirdparty.com.google.common.cache.CacheBuilder; 163import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList; 164import org.apache.hbase.thirdparty.com.google.common.collect.Lists; 165import org.apache.hbase.thirdparty.com.google.protobuf.ByteString; 166import org.apache.hbase.thirdparty.com.google.protobuf.Message; 167import org.apache.hbase.thirdparty.com.google.protobuf.RpcController; 168import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException; 169import org.apache.hbase.thirdparty.com.google.protobuf.TextFormat; 170import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations; 171import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils; 172 173import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 174import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter; 175import org.apache.hadoop.hbase.shaded.protobuf.ResponseConverter; 176import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService; 177import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearCompactionQueuesRequest; 178import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearCompactionQueuesResponse; 179import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearRegionBlockCacheRequest; 180import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearRegionBlockCacheResponse; 181import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearSlowLogResponseRequest; 182import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearSlowLogResponses; 183import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CloseRegionRequest; 184import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CloseRegionResponse; 185import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactRegionRequest; 186import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactRegionResponse; 187import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactionSwitchRequest; 188import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactionSwitchResponse; 189import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ExecuteProceduresRequest; 190import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ExecuteProceduresResponse; 191import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionRequest; 192import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionResponse; 193import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetOnlineRegionRequest; 194import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetOnlineRegionResponse; 195import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoRequest; 196import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoResponse; 197import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionLoadRequest; 198import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionLoadResponse; 199import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetServerInfoRequest; 200import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetServerInfoResponse; 201import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetStoreFileRequest; 202import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetStoreFileResponse; 203import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionRequest; 204import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionRequest.RegionOpenInfo; 205import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionResponse; 206import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionResponse.RegionOpeningState; 207import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.RemoteProcedureRequest; 208import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ReplicateWALEntryRequest; 209import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ReplicateWALEntryResponse; 210import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.RollWALWriterRequest; 211import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.RollWALWriterResponse; 212import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.SlowLogResponseRequest; 213import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.SlowLogResponses; 214import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.StopServerRequest; 215import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.StopServerResponse; 216import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateConfigurationRequest; 217import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateConfigurationResponse; 218import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateFavoredNodesRequest; 219import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateFavoredNodesResponse; 220import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WALEntry; 221import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WarmupRegionRequest; 222import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WarmupRegionResponse; 223import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos; 224import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.Action; 225import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.BulkLoadHFileRequest; 226import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.BulkLoadHFileRequest.FamilyPath; 227import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.BulkLoadHFileResponse; 228import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CleanupBulkLoadRequest; 229import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CleanupBulkLoadResponse; 230import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService; 231import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.Condition; 232import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceRequest; 233import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceResponse; 234import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.GetRequest; 235import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.GetResponse; 236import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MultiRegionLoadStats; 237import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MultiRequest; 238import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MultiResponse; 239import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateRequest; 240import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateResponse; 241import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutationProto; 242import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutationProto.MutationType; 243import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.PrepareBulkLoadRequest; 244import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.PrepareBulkLoadResponse; 245import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.RegionAction; 246import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.RegionActionResult; 247import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ResultOrException; 248import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanRequest; 249import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanResponse; 250import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos; 251import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos.RegionLoad; 252import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos; 253import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.NameBytesPair; 254import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.NameInt64Pair; 255import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier; 256import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType; 257import org.apache.hadoop.hbase.shaded.protobuf.generated.MapReduceProtos.ScanMetrics; 258import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsRequest; 259import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsResponse; 260import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsResponse.TableQuotaSnapshot; 261import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader; 262import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.ClientMetaService; 263import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetActiveMasterRequest; 264import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetActiveMasterResponse; 265import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetBootstrapNodesRequest; 266import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetBootstrapNodesResponse; 267import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetClusterIdRequest; 268import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetClusterIdResponse; 269import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetMastersRequest; 270import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetMastersResponse; 271import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetMastersResponseEntry; 272import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetMetaRegionLocationsRequest; 273import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetMetaRegionLocationsResponse; 274import org.apache.hadoop.hbase.shaded.protobuf.generated.TooSlowLog.SlowLogPayload; 275import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.BulkLoadDescriptor; 276import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.CompactionDescriptor; 277import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.FlushDescriptor; 278import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.RegionEventDescriptor; 279 280/** 281 * Implements the regionserver RPC services. 282 */ 283@InterfaceAudience.Private 284@SuppressWarnings("deprecation") 285public class RSRpcServices 286 implements HBaseRPCErrorHandler, AdminService.BlockingInterface, ClientService.BlockingInterface, 287 ClientMetaService.BlockingInterface, PriorityFunction, ConfigurationObserver { 288 protected static final Logger LOG = LoggerFactory.getLogger(RSRpcServices.class); 289 290 /** RPC scheduler to use for the region server. */ 291 public static final String REGION_SERVER_RPC_SCHEDULER_FACTORY_CLASS = 292 "hbase.region.server.rpc.scheduler.factory.class"; 293 294 /** RPC scheduler to use for the master. */ 295 public static final String MASTER_RPC_SCHEDULER_FACTORY_CLASS = 296 "hbase.master.rpc.scheduler.factory.class"; 297 298 /** 299 * Minimum allowable time limit delta (in milliseconds) that can be enforced during scans. This 300 * configuration exists to prevent the scenario where a time limit is specified to be so 301 * restrictive that the time limit is reached immediately (before any cells are scanned). 302 */ 303 private static final String REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA = 304 "hbase.region.server.rpc.minimum.scan.time.limit.delta"; 305 /** 306 * Default value of {@link RSRpcServices#REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA} 307 */ 308 static final long DEFAULT_REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA = 10; 309 310 /** 311 * Whether to reject rows with size > threshold defined by 312 * {@link HConstants#BATCH_ROWS_THRESHOLD_NAME} 313 */ 314 private static final String REJECT_BATCH_ROWS_OVER_THRESHOLD = 315 "hbase.rpc.rows.size.threshold.reject"; 316 317 /** 318 * Default value of config {@link RSRpcServices#REJECT_BATCH_ROWS_OVER_THRESHOLD} 319 */ 320 private static final boolean DEFAULT_REJECT_BATCH_ROWS_OVER_THRESHOLD = false; 321 322 public static final String CLIENT_BOOTSTRAP_NODE_LIMIT = "hbase.client.bootstrap.node.limit"; 323 324 public static final int DEFAULT_CLIENT_BOOTSTRAP_NODE_LIMIT = 10; 325 326 // Request counter. (Includes requests that are not serviced by regions.) 327 // Count only once for requests with multiple actions like multi/caching-scan/replayBatch 328 final LongAdder requestCount = new LongAdder(); 329 330 // Request counter for rpc get 331 final LongAdder rpcGetRequestCount = new LongAdder(); 332 333 // Request counter for rpc scan 334 final LongAdder rpcScanRequestCount = new LongAdder(); 335 336 // Request counter for scans that might end up in full scans 337 final LongAdder rpcFullScanRequestCount = new LongAdder(); 338 339 // Request counter for rpc multi 340 final LongAdder rpcMultiRequestCount = new LongAdder(); 341 342 // Request counter for rpc mutate 343 final LongAdder rpcMutateRequestCount = new LongAdder(); 344 345 // Server to handle client requests. 346 final RpcServerInterface rpcServer; 347 final InetSocketAddress isa; 348 349 protected final HRegionServer regionServer; 350 private final long maxScannerResultSize; 351 352 // The reference to the priority extraction function 353 private final PriorityFunction priority; 354 355 private ScannerIdGenerator scannerIdGenerator; 356 private final ConcurrentMap<String, RegionScannerHolder> scanners = new ConcurrentHashMap<>(); 357 // Hold the name of a closed scanner for a while. This is used to keep compatible for old clients 358 // which may send next or close request to a region scanner which has already been exhausted. The 359 // entries will be removed automatically after scannerLeaseTimeoutPeriod. 360 private final Cache<String, String> closedScanners; 361 /** 362 * The lease timeout period for client scanners (milliseconds). 363 */ 364 private final int scannerLeaseTimeoutPeriod; 365 366 /** 367 * The RPC timeout period (milliseconds) 368 */ 369 private final int rpcTimeout; 370 371 /** 372 * The minimum allowable delta to use for the scan limit 373 */ 374 private final long minimumScanTimeLimitDelta; 375 376 /** 377 * Row size threshold for multi requests above which a warning is logged 378 */ 379 private final int rowSizeWarnThreshold; 380 /* 381 * Whether we should reject requests with very high no of rows i.e. beyond threshold defined by 382 * rowSizeWarnThreshold 383 */ 384 private final boolean rejectRowsWithSizeOverThreshold; 385 386 final AtomicBoolean clearCompactionQueues = new AtomicBoolean(false); 387 388 private AccessChecker accessChecker; 389 private ZKPermissionWatcher zkPermissionWatcher; 390 391 /** 392 * Services launched in RSRpcServices. By default they are on but you can use the below booleans 393 * to selectively enable/disable either Admin or Client Service (Rare is the case where you would 394 * ever turn off one or the other). 395 */ 396 public static final String REGIONSERVER_ADMIN_SERVICE_CONFIG = 397 "hbase.regionserver.admin.executorService"; 398 public static final String REGIONSERVER_CLIENT_SERVICE_CONFIG = 399 "hbase.regionserver.client.executorService"; 400 public static final String REGIONSERVER_CLIENT_META_SERVICE_CONFIG = 401 "hbase.regionserver.client.meta.executorService"; 402 403 /** 404 * An Rpc callback for closing a RegionScanner. 405 */ 406 private static final class RegionScannerCloseCallBack implements RpcCallback { 407 408 private final RegionScanner scanner; 409 410 public RegionScannerCloseCallBack(RegionScanner scanner) { 411 this.scanner = scanner; 412 } 413 414 @Override 415 public void run() throws IOException { 416 this.scanner.close(); 417 } 418 } 419 420 /** 421 * An Rpc callback for doing shipped() call on a RegionScanner. 422 */ 423 private class RegionScannerShippedCallBack implements RpcCallback { 424 private final String scannerName; 425 private final Shipper shipper; 426 private final Lease lease; 427 428 public RegionScannerShippedCallBack(String scannerName, Shipper shipper, Lease lease) { 429 this.scannerName = scannerName; 430 this.shipper = shipper; 431 this.lease = lease; 432 } 433 434 @Override 435 public void run() throws IOException { 436 this.shipper.shipped(); 437 // We're done. On way out re-add the above removed lease. The lease was temp removed for this 438 // Rpc call and we are at end of the call now. Time to add it back. 439 if (scanners.containsKey(scannerName)) { 440 if (lease != null) { 441 regionServer.getLeaseManager().addLease(lease); 442 } 443 } 444 } 445 } 446 447 /** 448 * An RpcCallBack that creates a list of scanners that needs to perform callBack operation on 449 * completion of multiGets. 450 */ 451 static class RegionScannersCloseCallBack implements RpcCallback { 452 private final List<RegionScanner> scanners = new ArrayList<>(); 453 454 public void addScanner(RegionScanner scanner) { 455 this.scanners.add(scanner); 456 } 457 458 @Override 459 public void run() { 460 for (RegionScanner scanner : scanners) { 461 try { 462 scanner.close(); 463 } catch (IOException e) { 464 LOG.error("Exception while closing the scanner " + scanner, e); 465 } 466 } 467 } 468 } 469 470 /** 471 * Holder class which holds the RegionScanner, nextCallSeq and RpcCallbacks together. 472 */ 473 static final class RegionScannerHolder { 474 private final AtomicLong nextCallSeq = new AtomicLong(0); 475 private final RegionScanner s; 476 private final HRegion r; 477 private final RpcCallback closeCallBack; 478 private final RpcCallback shippedCallback; 479 private byte[] rowOfLastPartialResult; 480 private boolean needCursor; 481 private boolean fullRegionScan; 482 private final String clientIPAndPort; 483 private final String userName; 484 485 RegionScannerHolder(RegionScanner s, HRegion r, RpcCallback closeCallBack, 486 RpcCallback shippedCallback, boolean needCursor, boolean fullRegionScan, 487 String clientIPAndPort, String userName) { 488 this.s = s; 489 this.r = r; 490 this.closeCallBack = closeCallBack; 491 this.shippedCallback = shippedCallback; 492 this.needCursor = needCursor; 493 this.fullRegionScan = fullRegionScan; 494 this.clientIPAndPort = clientIPAndPort; 495 this.userName = userName; 496 } 497 498 long getNextCallSeq() { 499 return nextCallSeq.get(); 500 } 501 502 boolean incNextCallSeq(long currentSeq) { 503 // Use CAS to prevent multiple scan request running on the same scanner. 504 return nextCallSeq.compareAndSet(currentSeq, currentSeq + 1); 505 } 506 507 // Should be called only when we need to print lease expired messages otherwise 508 // cache the String once made. 509 @Override 510 public String toString() { 511 return "clientIPAndPort=" + this.clientIPAndPort + ", userName=" + this.userName 512 + ", regionInfo=" + this.r.getRegionInfo().getRegionNameAsString(); 513 } 514 } 515 516 /** 517 * Instantiated as a scanner lease. If the lease times out, the scanner is closed 518 */ 519 private class ScannerListener implements LeaseListener { 520 private final String scannerName; 521 522 ScannerListener(final String n) { 523 this.scannerName = n; 524 } 525 526 @Override 527 public void leaseExpired() { 528 RegionScannerHolder rsh = scanners.remove(this.scannerName); 529 if (rsh == null) { 530 LOG.warn("Scanner lease {} expired but no outstanding scanner", this.scannerName); 531 return; 532 } 533 LOG.info("Scanner lease {} expired {}", this.scannerName, rsh); 534 regionServer.getMetrics().incrScannerLeaseExpired(); 535 RegionScanner s = rsh.s; 536 HRegion region = null; 537 try { 538 region = regionServer.getRegion(s.getRegionInfo().getRegionName()); 539 if (region != null && region.getCoprocessorHost() != null) { 540 region.getCoprocessorHost().preScannerClose(s); 541 } 542 } catch (IOException e) { 543 LOG.error("Closing scanner {} {}", this.scannerName, rsh, e); 544 } finally { 545 try { 546 s.close(); 547 if (region != null && region.getCoprocessorHost() != null) { 548 region.getCoprocessorHost().postScannerClose(s); 549 } 550 } catch (IOException e) { 551 LOG.error("Closing scanner {} {}", this.scannerName, rsh, e); 552 } 553 } 554 } 555 } 556 557 private static ResultOrException getResultOrException(final ClientProtos.Result r, 558 final int index) { 559 return getResultOrException(ResponseConverter.buildActionResult(r), index); 560 } 561 562 private static ResultOrException getResultOrException(final Exception e, final int index) { 563 return getResultOrException(ResponseConverter.buildActionResult(e), index); 564 } 565 566 private static ResultOrException getResultOrException(final ResultOrException.Builder builder, 567 final int index) { 568 return builder.setIndex(index).build(); 569 } 570 571 /** 572 * Checks for the following pre-checks in order: 573 * <ol> 574 * <li>RegionServer is running</li> 575 * <li>If authorization is enabled, then RPC caller has ADMIN permissions</li> 576 * </ol> 577 * @param requestName name of rpc request. Used in reporting failures to provide context. 578 * @throws ServiceException If any of the above listed pre-check fails. 579 */ 580 private void rpcPreCheck(String requestName) throws ServiceException { 581 try { 582 checkOpen(); 583 requirePermission(requestName, Permission.Action.ADMIN); 584 } catch (IOException ioe) { 585 throw new ServiceException(ioe); 586 } 587 } 588 589 private boolean isClientCellBlockSupport(RpcCallContext context) { 590 return context != null && context.isClientCellBlockSupported(); 591 } 592 593 private void addResult(final MutateResponse.Builder builder, final Result result, 594 final HBaseRpcController rpcc, boolean clientCellBlockSupported) { 595 if (result == null) return; 596 if (clientCellBlockSupported) { 597 builder.setResult(ProtobufUtil.toResultNoData(result)); 598 rpcc.setCellScanner(result.cellScanner()); 599 } else { 600 ClientProtos.Result pbr = ProtobufUtil.toResult(result); 601 builder.setResult(pbr); 602 } 603 } 604 605 private void addResults(ScanResponse.Builder builder, List<Result> results, 606 HBaseRpcController controller, boolean isDefaultRegion, boolean clientCellBlockSupported) { 607 builder.setStale(!isDefaultRegion); 608 if (results.isEmpty()) { 609 return; 610 } 611 if (clientCellBlockSupported) { 612 for (Result res : results) { 613 builder.addCellsPerResult(res.size()); 614 builder.addPartialFlagPerResult(res.mayHaveMoreCellsInRow()); 615 } 616 controller.setCellScanner(CellUtil.createCellScanner(results)); 617 } else { 618 for (Result res : results) { 619 ClientProtos.Result pbr = ProtobufUtil.toResult(res); 620 builder.addResults(pbr); 621 } 622 } 623 } 624 625 private CheckAndMutateResult checkAndMutate(HRegion region, List<ClientProtos.Action> actions, 626 CellScanner cellScanner, Condition condition, long nonceGroup, 627 ActivePolicyEnforcement spaceQuotaEnforcement) throws IOException { 628 int countOfCompleteMutation = 0; 629 try { 630 if (!region.getRegionInfo().isMetaRegion()) { 631 regionServer.getMemStoreFlusher().reclaimMemStoreMemory(); 632 } 633 List<Mutation> mutations = new ArrayList<>(); 634 long nonce = HConstants.NO_NONCE; 635 for (ClientProtos.Action action : actions) { 636 if (action.hasGet()) { 637 throw new DoNotRetryIOException( 638 "Atomic put and/or delete only, not a Get=" + action.getGet()); 639 } 640 MutationProto mutation = action.getMutation(); 641 MutationType type = mutation.getMutateType(); 642 switch (type) { 643 case PUT: 644 Put put = ProtobufUtil.toPut(mutation, cellScanner); 645 ++countOfCompleteMutation; 646 checkCellSizeLimit(region, put); 647 spaceQuotaEnforcement.getPolicyEnforcement(region).check(put); 648 mutations.add(put); 649 break; 650 case DELETE: 651 Delete del = ProtobufUtil.toDelete(mutation, cellScanner); 652 ++countOfCompleteMutation; 653 spaceQuotaEnforcement.getPolicyEnforcement(region).check(del); 654 mutations.add(del); 655 break; 656 case INCREMENT: 657 Increment increment = ProtobufUtil.toIncrement(mutation, cellScanner); 658 nonce = mutation.hasNonce() ? mutation.getNonce() : HConstants.NO_NONCE; 659 ++countOfCompleteMutation; 660 checkCellSizeLimit(region, increment); 661 spaceQuotaEnforcement.getPolicyEnforcement(region).check(increment); 662 mutations.add(increment); 663 break; 664 case APPEND: 665 Append append = ProtobufUtil.toAppend(mutation, cellScanner); 666 nonce = mutation.hasNonce() ? mutation.getNonce() : HConstants.NO_NONCE; 667 ++countOfCompleteMutation; 668 checkCellSizeLimit(region, append); 669 spaceQuotaEnforcement.getPolicyEnforcement(region).check(append); 670 mutations.add(append); 671 break; 672 default: 673 throw new DoNotRetryIOException("invalid mutation type : " + type); 674 } 675 } 676 677 if (mutations.size() == 0) { 678 return new CheckAndMutateResult(true, null); 679 } else { 680 CheckAndMutate checkAndMutate = ProtobufUtil.toCheckAndMutate(condition, mutations); 681 CheckAndMutateResult result = null; 682 if (region.getCoprocessorHost() != null) { 683 result = region.getCoprocessorHost().preCheckAndMutate(checkAndMutate); 684 } 685 if (result == null) { 686 result = region.checkAndMutate(checkAndMutate, nonceGroup, nonce); 687 if (region.getCoprocessorHost() != null) { 688 result = region.getCoprocessorHost().postCheckAndMutate(checkAndMutate, result); 689 } 690 } 691 return result; 692 } 693 } finally { 694 // Currently, the checkAndMutate isn't supported by batch so it won't mess up the cell scanner 695 // even if the malformed cells are not skipped. 696 for (int i = countOfCompleteMutation; i < actions.size(); ++i) { 697 skipCellsForMutation(actions.get(i), cellScanner); 698 } 699 } 700 } 701 702 /** 703 * Execute an append mutation. 704 * @return result to return to client if default operation should be bypassed as indicated by 705 * RegionObserver, null otherwise 706 */ 707 private Result append(final HRegion region, final OperationQuota quota, 708 final MutationProto mutation, final CellScanner cellScanner, long nonceGroup, 709 ActivePolicyEnforcement spaceQuota) throws IOException { 710 long before = EnvironmentEdgeManager.currentTime(); 711 Append append = ProtobufUtil.toAppend(mutation, cellScanner); 712 checkCellSizeLimit(region, append); 713 spaceQuota.getPolicyEnforcement(region).check(append); 714 quota.addMutation(append); 715 long nonce = mutation.hasNonce() ? mutation.getNonce() : HConstants.NO_NONCE; 716 Result r = region.append(append, nonceGroup, nonce); 717 if (regionServer.getMetrics() != null) { 718 regionServer.getMetrics().updateAppend(region.getTableDescriptor().getTableName(), 719 EnvironmentEdgeManager.currentTime() - before); 720 } 721 return r == null ? Result.EMPTY_RESULT : r; 722 } 723 724 /** 725 * Execute an increment mutation. 726 */ 727 private Result increment(final HRegion region, final OperationQuota quota, 728 final MutationProto mutation, final CellScanner cells, long nonceGroup, 729 ActivePolicyEnforcement spaceQuota) throws IOException { 730 long before = EnvironmentEdgeManager.currentTime(); 731 Increment increment = ProtobufUtil.toIncrement(mutation, cells); 732 checkCellSizeLimit(region, increment); 733 spaceQuota.getPolicyEnforcement(region).check(increment); 734 quota.addMutation(increment); 735 long nonce = mutation.hasNonce() ? mutation.getNonce() : HConstants.NO_NONCE; 736 Result r = region.increment(increment, nonceGroup, nonce); 737 final MetricsRegionServer metricsRegionServer = regionServer.getMetrics(); 738 if (metricsRegionServer != null) { 739 metricsRegionServer.updateIncrement(region.getTableDescriptor().getTableName(), 740 EnvironmentEdgeManager.currentTime() - before); 741 } 742 return r == null ? Result.EMPTY_RESULT : r; 743 } 744 745 /** 746 * Run through the regionMutation <code>rm</code> and per Mutation, do the work, and then when 747 * done, add an instance of a {@link ResultOrException} that corresponds to each Mutation. 748 * @param cellsToReturn Could be null. May be allocated in this method. This is what this method 749 * returns as a 'result'. 750 * @param closeCallBack the callback to be used with multigets 751 * @param context the current RpcCallContext 752 * @return Return the <code>cellScanner</code> passed 753 */ 754 private List<CellScannable> doNonAtomicRegionMutation(final HRegion region, 755 final OperationQuota quota, final RegionAction actions, final CellScanner cellScanner, 756 final RegionActionResult.Builder builder, List<CellScannable> cellsToReturn, long nonceGroup, 757 final RegionScannersCloseCallBack closeCallBack, RpcCallContext context, 758 ActivePolicyEnforcement spaceQuotaEnforcement) { 759 // Gather up CONTIGUOUS Puts and Deletes in this mutations List. Idea is that rather than do 760 // one at a time, we instead pass them in batch. Be aware that the corresponding 761 // ResultOrException instance that matches each Put or Delete is then added down in the 762 // doNonAtomicBatchOp call. We should be staying aligned though the Put and Delete are 763 // deferred/batched 764 List<ClientProtos.Action> mutations = null; 765 long maxQuotaResultSize = Math.min(maxScannerResultSize, quota.getReadAvailable()); 766 IOException sizeIOE = null; 767 Object lastBlock = null; 768 ClientProtos.ResultOrException.Builder resultOrExceptionBuilder = 769 ResultOrException.newBuilder(); 770 boolean hasResultOrException = false; 771 for (ClientProtos.Action action : actions.getActionList()) { 772 hasResultOrException = false; 773 resultOrExceptionBuilder.clear(); 774 try { 775 Result r = null; 776 777 if ( 778 context != null && context.isRetryImmediatelySupported() 779 && (context.getResponseCellSize() > maxQuotaResultSize 780 || context.getResponseBlockSize() + context.getResponseExceptionSize() 781 > maxQuotaResultSize) 782 ) { 783 784 // We're storing the exception since the exception and reason string won't 785 // change after the response size limit is reached. 786 if (sizeIOE == null) { 787 // We don't need the stack un-winding do don't throw the exception. 788 // Throwing will kill the JVM's JIT. 789 // 790 // Instead just create the exception and then store it. 791 sizeIOE = new MultiActionResultTooLarge("Max size exceeded" + " CellSize: " 792 + context.getResponseCellSize() + " BlockSize: " + context.getResponseBlockSize()); 793 794 // Only report the exception once since there's only one request that 795 // caused the exception. Otherwise this number will dominate the exceptions count. 796 rpcServer.getMetrics().exception(sizeIOE); 797 } 798 799 // Now that there's an exception is known to be created 800 // use it for the response. 801 // 802 // This will create a copy in the builder. 803 NameBytesPair pair = ResponseConverter.buildException(sizeIOE); 804 resultOrExceptionBuilder.setException(pair); 805 context.incrementResponseExceptionSize(pair.getSerializedSize()); 806 resultOrExceptionBuilder.setIndex(action.getIndex()); 807 builder.addResultOrException(resultOrExceptionBuilder.build()); 808 skipCellsForMutation(action, cellScanner); 809 continue; 810 } 811 if (action.hasGet()) { 812 long before = EnvironmentEdgeManager.currentTime(); 813 ClientProtos.Get pbGet = action.getGet(); 814 // An asynchbase client, https://github.com/OpenTSDB/asynchbase, starts by trying to do 815 // a get closest before. Throwing the UnknownProtocolException signals it that it needs 816 // to switch and do hbase2 protocol (HBase servers do not tell clients what versions 817 // they are; its a problem for non-native clients like asynchbase. HBASE-20225. 818 if (pbGet.hasClosestRowBefore() && pbGet.getClosestRowBefore()) { 819 throw new UnknownProtocolException("Is this a pre-hbase-1.0.0 or asynchbase client? " 820 + "Client is invoking getClosestRowBefore removed in hbase-2.0.0 replaced by " 821 + "reverse Scan."); 822 } 823 try { 824 Get get = ProtobufUtil.toGet(pbGet); 825 if (context != null) { 826 r = get(get, (region), closeCallBack, context); 827 } else { 828 r = region.get(get); 829 } 830 } finally { 831 final MetricsRegionServer metricsRegionServer = regionServer.getMetrics(); 832 if (metricsRegionServer != null) { 833 metricsRegionServer.updateGet(region.getTableDescriptor().getTableName(), 834 EnvironmentEdgeManager.currentTime() - before); 835 } 836 } 837 } else if (action.hasServiceCall()) { 838 hasResultOrException = true; 839 com.google.protobuf.Message result = execServiceOnRegion(region, action.getServiceCall()); 840 ClientProtos.CoprocessorServiceResult.Builder serviceResultBuilder = 841 ClientProtos.CoprocessorServiceResult.newBuilder(); 842 resultOrExceptionBuilder.setServiceResult(serviceResultBuilder 843 .setValue(serviceResultBuilder.getValueBuilder().setName(result.getClass().getName()) 844 // TODO: Copy!!! 845 .setValue(UnsafeByteOperations.unsafeWrap(result.toByteArray())))); 846 } else if (action.hasMutation()) { 847 MutationType type = action.getMutation().getMutateType(); 848 if ( 849 type != MutationType.PUT && type != MutationType.DELETE && mutations != null 850 && !mutations.isEmpty() 851 ) { 852 // Flush out any Puts or Deletes already collected. 853 doNonAtomicBatchOp(builder, region, quota, mutations, cellScanner, 854 spaceQuotaEnforcement); 855 mutations.clear(); 856 } 857 switch (type) { 858 case APPEND: 859 r = append(region, quota, action.getMutation(), cellScanner, nonceGroup, 860 spaceQuotaEnforcement); 861 break; 862 case INCREMENT: 863 r = increment(region, quota, action.getMutation(), cellScanner, nonceGroup, 864 spaceQuotaEnforcement); 865 break; 866 case PUT: 867 case DELETE: 868 // Collect the individual mutations and apply in a batch 869 if (mutations == null) { 870 mutations = new ArrayList<>(actions.getActionCount()); 871 } 872 mutations.add(action); 873 break; 874 default: 875 throw new DoNotRetryIOException("Unsupported mutate type: " + type.name()); 876 } 877 } else { 878 throw new HBaseIOException("Unexpected Action type"); 879 } 880 if (r != null) { 881 ClientProtos.Result pbResult = null; 882 if (isClientCellBlockSupport(context)) { 883 pbResult = ProtobufUtil.toResultNoData(r); 884 // Hard to guess the size here. Just make a rough guess. 885 if (cellsToReturn == null) { 886 cellsToReturn = new ArrayList<>(); 887 } 888 cellsToReturn.add(r); 889 } else { 890 pbResult = ProtobufUtil.toResult(r); 891 } 892 lastBlock = addSize(context, r, lastBlock); 893 hasResultOrException = true; 894 resultOrExceptionBuilder.setResult(pbResult); 895 } 896 // Could get to here and there was no result and no exception. Presumes we added 897 // a Put or Delete to the collecting Mutations List for adding later. In this 898 // case the corresponding ResultOrException instance for the Put or Delete will be added 899 // down in the doNonAtomicBatchOp method call rather than up here. 900 } catch (IOException ie) { 901 rpcServer.getMetrics().exception(ie); 902 hasResultOrException = true; 903 NameBytesPair pair = ResponseConverter.buildException(ie); 904 resultOrExceptionBuilder.setException(pair); 905 context.incrementResponseExceptionSize(pair.getSerializedSize()); 906 } 907 if (hasResultOrException) { 908 // Propagate index. 909 resultOrExceptionBuilder.setIndex(action.getIndex()); 910 builder.addResultOrException(resultOrExceptionBuilder.build()); 911 } 912 } 913 // Finish up any outstanding mutations 914 if (!CollectionUtils.isEmpty(mutations)) { 915 doNonAtomicBatchOp(builder, region, quota, mutations, cellScanner, spaceQuotaEnforcement); 916 } 917 return cellsToReturn; 918 } 919 920 private void checkCellSizeLimit(final HRegion r, final Mutation m) throws IOException { 921 if (r.maxCellSize > 0) { 922 CellScanner cells = m.cellScanner(); 923 while (cells.advance()) { 924 int size = PrivateCellUtil.estimatedSerializedSizeOf(cells.current()); 925 if (size > r.maxCellSize) { 926 String msg = "Cell[" + cells.current() + "] with size " + size + " exceeds limit of " 927 + r.maxCellSize + " bytes"; 928 LOG.debug(msg); 929 throw new DoNotRetryIOException(msg); 930 } 931 } 932 } 933 } 934 935 private void doAtomicBatchOp(final RegionActionResult.Builder builder, final HRegion region, 936 final OperationQuota quota, final List<ClientProtos.Action> mutations, final CellScanner cells, 937 long nonceGroup, ActivePolicyEnforcement spaceQuotaEnforcement) throws IOException { 938 // Just throw the exception. The exception will be caught and then added to region-level 939 // exception for RegionAction. Leaving the null to action result is ok since the null 940 // result is viewed as failure by hbase client. And the region-lever exception will be used 941 // to replaced the null result. see AsyncRequestFutureImpl#receiveMultiAction and 942 // AsyncBatchRpcRetryingCaller#onComplete for more details. 943 doBatchOp(builder, region, quota, mutations, cells, nonceGroup, spaceQuotaEnforcement, true); 944 } 945 946 private void doNonAtomicBatchOp(final RegionActionResult.Builder builder, final HRegion region, 947 final OperationQuota quota, final List<ClientProtos.Action> mutations, final CellScanner cells, 948 ActivePolicyEnforcement spaceQuotaEnforcement) { 949 try { 950 doBatchOp(builder, region, quota, mutations, cells, HConstants.NO_NONCE, 951 spaceQuotaEnforcement, false); 952 } catch (IOException e) { 953 // Set the exception for each action. The mutations in same RegionAction are group to 954 // different batch and then be processed individually. Hence, we don't set the region-level 955 // exception here for whole RegionAction. 956 for (Action mutation : mutations) { 957 builder.addResultOrException(getResultOrException(e, mutation.getIndex())); 958 } 959 } 960 } 961 962 /** 963 * Execute a list of mutations. nnn 964 */ 965 private void doBatchOp(final RegionActionResult.Builder builder, final HRegion region, 966 final OperationQuota quota, final List<ClientProtos.Action> mutations, final CellScanner cells, 967 long nonceGroup, ActivePolicyEnforcement spaceQuotaEnforcement, boolean atomic) 968 throws IOException { 969 Mutation[] mArray = new Mutation[mutations.size()]; 970 long before = EnvironmentEdgeManager.currentTime(); 971 boolean batchContainsPuts = false, batchContainsDelete = false; 972 try { 973 /** 974 * HBASE-17924 mutationActionMap is a map to map the relation between mutations and actions 975 * since mutation array may have been reoredered.In order to return the right result or 976 * exception to the corresponding actions, We need to know which action is the mutation belong 977 * to. We can't sort ClientProtos.Action array, since they are bonded to cellscanners. 978 */ 979 Map<Mutation, ClientProtos.Action> mutationActionMap = new HashMap<>(); 980 int i = 0; 981 long nonce = HConstants.NO_NONCE; 982 for (ClientProtos.Action action : mutations) { 983 if (action.hasGet()) { 984 throw new DoNotRetryIOException( 985 "Atomic put and/or delete only, not a Get=" + action.getGet()); 986 } 987 MutationProto m = action.getMutation(); 988 Mutation mutation; 989 switch (m.getMutateType()) { 990 case PUT: 991 mutation = ProtobufUtil.toPut(m, cells); 992 batchContainsPuts = true; 993 break; 994 995 case DELETE: 996 mutation = ProtobufUtil.toDelete(m, cells); 997 batchContainsDelete = true; 998 break; 999 1000 case INCREMENT: 1001 mutation = ProtobufUtil.toIncrement(m, cells); 1002 nonce = m.hasNonce() ? m.getNonce() : HConstants.NO_NONCE; 1003 break; 1004 1005 case APPEND: 1006 mutation = ProtobufUtil.toAppend(m, cells); 1007 nonce = m.hasNonce() ? m.getNonce() : HConstants.NO_NONCE; 1008 break; 1009 1010 default: 1011 throw new DoNotRetryIOException("Invalid mutation type : " + m.getMutateType()); 1012 } 1013 mutationActionMap.put(mutation, action); 1014 mArray[i++] = mutation; 1015 checkCellSizeLimit(region, mutation); 1016 // Check if a space quota disallows this mutation 1017 spaceQuotaEnforcement.getPolicyEnforcement(region).check(mutation); 1018 quota.addMutation(mutation); 1019 } 1020 1021 if (!region.getRegionInfo().isMetaRegion()) { 1022 regionServer.getMemStoreFlusher().reclaimMemStoreMemory(); 1023 } 1024 1025 // HBASE-17924 1026 // Sort to improve lock efficiency for non-atomic batch of operations. If atomic 1027 // order is preserved as its expected from the client 1028 if (!atomic) { 1029 Arrays.sort(mArray, (v1, v2) -> Row.COMPARATOR.compare(v1, v2)); 1030 } 1031 1032 OperationStatus[] codes = region.batchMutate(mArray, atomic, nonceGroup, nonce); 1033 1034 // When atomic is true, it indicates that the mutateRow API or the batch API with 1035 // RowMutations is called. In this case, we need to merge the results of the 1036 // Increment/Append operations if the mutations include those operations, and set the merged 1037 // result to the first element of the ResultOrException list 1038 if (atomic) { 1039 List<ResultOrException> resultOrExceptions = new ArrayList<>(); 1040 List<Result> results = new ArrayList<>(); 1041 for (i = 0; i < codes.length; i++) { 1042 if (codes[i].getResult() != null) { 1043 results.add(codes[i].getResult()); 1044 } 1045 if (i != 0) { 1046 resultOrExceptions 1047 .add(getResultOrException(ClientProtos.Result.getDefaultInstance(), i)); 1048 } 1049 } 1050 1051 if (results.isEmpty()) { 1052 builder.addResultOrException( 1053 getResultOrException(ClientProtos.Result.getDefaultInstance(), 0)); 1054 } else { 1055 // Merge the results of the Increment/Append operations 1056 List<Cell> cellList = new ArrayList<>(); 1057 for (Result result : results) { 1058 if (result.rawCells() != null) { 1059 cellList.addAll(Arrays.asList(result.rawCells())); 1060 } 1061 } 1062 Result result = Result.create(cellList); 1063 1064 // Set the merged result of the Increment/Append operations to the first element of the 1065 // ResultOrException list 1066 builder.addResultOrException(getResultOrException(ProtobufUtil.toResult(result), 0)); 1067 } 1068 1069 builder.addAllResultOrException(resultOrExceptions); 1070 return; 1071 } 1072 1073 for (i = 0; i < codes.length; i++) { 1074 Mutation currentMutation = mArray[i]; 1075 ClientProtos.Action currentAction = mutationActionMap.get(currentMutation); 1076 int index = currentAction.hasIndex() ? currentAction.getIndex() : i; 1077 Exception e; 1078 switch (codes[i].getOperationStatusCode()) { 1079 case BAD_FAMILY: 1080 e = new NoSuchColumnFamilyException(codes[i].getExceptionMsg()); 1081 builder.addResultOrException(getResultOrException(e, index)); 1082 break; 1083 1084 case SANITY_CHECK_FAILURE: 1085 e = new FailedSanityCheckException(codes[i].getExceptionMsg()); 1086 builder.addResultOrException(getResultOrException(e, index)); 1087 break; 1088 1089 default: 1090 e = new DoNotRetryIOException(codes[i].getExceptionMsg()); 1091 builder.addResultOrException(getResultOrException(e, index)); 1092 break; 1093 1094 case SUCCESS: 1095 builder.addResultOrException( 1096 getResultOrException(ClientProtos.Result.getDefaultInstance(), index)); 1097 break; 1098 1099 case STORE_TOO_BUSY: 1100 e = new RegionTooBusyException(codes[i].getExceptionMsg()); 1101 builder.addResultOrException(getResultOrException(e, index)); 1102 break; 1103 } 1104 } 1105 } finally { 1106 int processedMutationIndex = 0; 1107 for (Action mutation : mutations) { 1108 // The non-null mArray[i] means the cell scanner has been read. 1109 if (mArray[processedMutationIndex++] == null) { 1110 skipCellsForMutation(mutation, cells); 1111 } 1112 } 1113 updateMutationMetrics(region, before, batchContainsPuts, batchContainsDelete); 1114 } 1115 } 1116 1117 private void updateMutationMetrics(HRegion region, long starttime, boolean batchContainsPuts, 1118 boolean batchContainsDelete) { 1119 final MetricsRegionServer metricsRegionServer = regionServer.getMetrics(); 1120 if (metricsRegionServer != null) { 1121 long after = EnvironmentEdgeManager.currentTime(); 1122 if (batchContainsPuts) { 1123 metricsRegionServer.updatePutBatch(region.getTableDescriptor().getTableName(), 1124 after - starttime); 1125 } 1126 if (batchContainsDelete) { 1127 metricsRegionServer.updateDeleteBatch(region.getTableDescriptor().getTableName(), 1128 after - starttime); 1129 } 1130 } 1131 } 1132 1133 /** 1134 * Execute a list of Put/Delete mutations. The function returns OperationStatus instead of 1135 * constructing MultiResponse to save a possible loop if caller doesn't need MultiResponse. nnn 1136 * * @return an array of OperationStatus which internally contains the OperationStatusCode and the 1137 * exceptionMessage if any n 1138 */ 1139 private OperationStatus[] doReplayBatchOp(final HRegion region, 1140 final List<MutationReplay> mutations, long replaySeqId) throws IOException { 1141 long before = EnvironmentEdgeManager.currentTime(); 1142 boolean batchContainsPuts = false, batchContainsDelete = false; 1143 try { 1144 for (Iterator<MutationReplay> it = mutations.iterator(); it.hasNext();) { 1145 MutationReplay m = it.next(); 1146 1147 if (m.getType() == MutationType.PUT) { 1148 batchContainsPuts = true; 1149 } else { 1150 batchContainsDelete = true; 1151 } 1152 1153 NavigableMap<byte[], List<Cell>> map = m.mutation.getFamilyCellMap(); 1154 List<Cell> metaCells = map.get(WALEdit.METAFAMILY); 1155 if (metaCells != null && !metaCells.isEmpty()) { 1156 for (Cell metaCell : metaCells) { 1157 CompactionDescriptor compactionDesc = WALEdit.getCompaction(metaCell); 1158 boolean isDefaultReplica = RegionReplicaUtil.isDefaultReplica(region.getRegionInfo()); 1159 HRegion hRegion = region; 1160 if (compactionDesc != null) { 1161 // replay the compaction. Remove the files from stores only if we are the primary 1162 // region replica (thus own the files) 1163 hRegion.replayWALCompactionMarker(compactionDesc, !isDefaultReplica, isDefaultReplica, 1164 replaySeqId); 1165 continue; 1166 } 1167 FlushDescriptor flushDesc = WALEdit.getFlushDescriptor(metaCell); 1168 if (flushDesc != null && !isDefaultReplica) { 1169 hRegion.replayWALFlushMarker(flushDesc, replaySeqId); 1170 continue; 1171 } 1172 RegionEventDescriptor regionEvent = WALEdit.getRegionEventDescriptor(metaCell); 1173 if (regionEvent != null && !isDefaultReplica) { 1174 hRegion.replayWALRegionEventMarker(regionEvent); 1175 continue; 1176 } 1177 BulkLoadDescriptor bulkLoadEvent = WALEdit.getBulkLoadDescriptor(metaCell); 1178 if (bulkLoadEvent != null) { 1179 hRegion.replayWALBulkLoadEventMarker(bulkLoadEvent); 1180 continue; 1181 } 1182 } 1183 it.remove(); 1184 } 1185 } 1186 requestCount.increment(); 1187 if (!region.getRegionInfo().isMetaRegion()) { 1188 regionServer.getMemStoreFlusher().reclaimMemStoreMemory(); 1189 } 1190 return region.batchReplay(mutations.toArray(new MutationReplay[mutations.size()]), 1191 replaySeqId); 1192 } finally { 1193 updateMutationMetrics(region, before, batchContainsPuts, batchContainsDelete); 1194 } 1195 } 1196 1197 private void closeAllScanners() { 1198 // Close any outstanding scanners. Means they'll get an UnknownScanner 1199 // exception next time they come in. 1200 for (Map.Entry<String, RegionScannerHolder> e : scanners.entrySet()) { 1201 try { 1202 e.getValue().s.close(); 1203 } catch (IOException ioe) { 1204 LOG.warn("Closing scanner " + e.getKey(), ioe); 1205 } 1206 } 1207 } 1208 1209 // Directly invoked only for testing 1210 public RSRpcServices(final HRegionServer rs) throws IOException { 1211 final Configuration conf = rs.getConfiguration(); 1212 regionServer = rs; 1213 rowSizeWarnThreshold = 1214 conf.getInt(HConstants.BATCH_ROWS_THRESHOLD_NAME, HConstants.BATCH_ROWS_THRESHOLD_DEFAULT); 1215 rejectRowsWithSizeOverThreshold = 1216 conf.getBoolean(REJECT_BATCH_ROWS_OVER_THRESHOLD, DEFAULT_REJECT_BATCH_ROWS_OVER_THRESHOLD); 1217 1218 final RpcSchedulerFactory rpcSchedulerFactory; 1219 try { 1220 rpcSchedulerFactory = getRpcSchedulerFactoryClass().asSubclass(RpcSchedulerFactory.class) 1221 .getDeclaredConstructor().newInstance(); 1222 } catch (NoSuchMethodException | InvocationTargetException | InstantiationException 1223 | IllegalAccessException e) { 1224 throw new IllegalArgumentException(e); 1225 } 1226 // Server to handle client requests. 1227 final InetSocketAddress initialIsa; 1228 final InetSocketAddress bindAddress; 1229 if (this instanceof MasterRpcServices) { 1230 String hostname = DNS.getHostname(conf, DNS.ServerType.MASTER); 1231 int port = conf.getInt(HConstants.MASTER_PORT, HConstants.DEFAULT_MASTER_PORT); 1232 // Creation of a HSA will force a resolve. 1233 initialIsa = new InetSocketAddress(hostname, port); 1234 bindAddress = new InetSocketAddress(conf.get("hbase.master.ipc.address", hostname), port); 1235 } else { 1236 String hostname = DNS.getHostname(conf, DNS.ServerType.REGIONSERVER); 1237 int port = conf.getInt(HConstants.REGIONSERVER_PORT, HConstants.DEFAULT_REGIONSERVER_PORT); 1238 // Creation of a HSA will force a resolve. 1239 initialIsa = new InetSocketAddress(hostname, port); 1240 bindAddress = 1241 new InetSocketAddress(conf.get("hbase.regionserver.ipc.address", hostname), port); 1242 } 1243 if (initialIsa.getAddress() == null) { 1244 throw new IllegalArgumentException("Failed resolve of " + initialIsa); 1245 } 1246 priority = createPriority(); 1247 // Using Address means we don't get the IP too. Shorten it more even to just the host name 1248 // w/o the domain. 1249 final String name = rs.getProcessName() + "/" 1250 + Address.fromParts(initialIsa.getHostName(), initialIsa.getPort()).toStringWithoutDomain(); 1251 // Set how many times to retry talking to another server over Connection. 1252 ConnectionUtils.setServerSideHConnectionRetriesConfig(conf, name, LOG); 1253 rpcServer = createRpcServer(rs, rpcSchedulerFactory, bindAddress, name); 1254 rpcServer.setRsRpcServices(this); 1255 if (!(rs instanceof HMaster)) { 1256 rpcServer.setNamedQueueRecorder(rs.getNamedQueueRecorder()); 1257 } 1258 scannerLeaseTimeoutPeriod = conf.getInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, 1259 HConstants.DEFAULT_HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD); 1260 maxScannerResultSize = conf.getLong(HConstants.HBASE_SERVER_SCANNER_MAX_RESULT_SIZE_KEY, 1261 HConstants.DEFAULT_HBASE_SERVER_SCANNER_MAX_RESULT_SIZE); 1262 rpcTimeout = 1263 conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY, HConstants.DEFAULT_HBASE_RPC_TIMEOUT); 1264 minimumScanTimeLimitDelta = conf.getLong(REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA, 1265 DEFAULT_REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA); 1266 1267 final InetSocketAddress address = rpcServer.getListenerAddress(); 1268 if (address == null) { 1269 throw new IOException("Listener channel is closed"); 1270 } 1271 // Set our address, however we need the final port that was given to rpcServer 1272 isa = new InetSocketAddress(initialIsa.getHostName(), address.getPort()); 1273 rpcServer.setErrorHandler(this); 1274 rs.setName(name); 1275 1276 closedScanners = CacheBuilder.newBuilder() 1277 .expireAfterAccess(scannerLeaseTimeoutPeriod, TimeUnit.MILLISECONDS).build(); 1278 } 1279 1280 protected RpcServerInterface createRpcServer(final Server server, 1281 final RpcSchedulerFactory rpcSchedulerFactory, final InetSocketAddress bindAddress, 1282 final String name) throws IOException { 1283 final Configuration conf = server.getConfiguration(); 1284 boolean reservoirEnabled = conf.getBoolean(ByteBuffAllocator.ALLOCATOR_POOL_ENABLED_KEY, true); 1285 try { 1286 return RpcServerFactory.createRpcServer(server, name, getServices(), bindAddress, // use final 1287 // bindAddress 1288 // for this 1289 // server. 1290 conf, rpcSchedulerFactory.create(conf, this, server), reservoirEnabled); 1291 } catch (BindException be) { 1292 throw new IOException(be.getMessage() + ". To switch ports use the '" 1293 + HConstants.REGIONSERVER_PORT + "' configuration property.", 1294 be.getCause() != null ? be.getCause() : be); 1295 } 1296 } 1297 1298 protected Class<?> getRpcSchedulerFactoryClass() { 1299 final Configuration conf = regionServer.getConfiguration(); 1300 return conf.getClass(REGION_SERVER_RPC_SCHEDULER_FACTORY_CLASS, 1301 SimpleRpcSchedulerFactory.class); 1302 } 1303 1304 @Override 1305 public void onConfigurationChange(Configuration newConf) { 1306 if (rpcServer instanceof ConfigurationObserver) { 1307 ((ConfigurationObserver) rpcServer).onConfigurationChange(newConf); 1308 } 1309 } 1310 1311 protected PriorityFunction createPriority() { 1312 return new AnnotationReadingPriorityFunction(this); 1313 } 1314 1315 protected void requirePermission(String request, Permission.Action perm) throws IOException { 1316 if (accessChecker != null) { 1317 accessChecker.requirePermission(RpcServer.getRequestUser().orElse(null), request, null, perm); 1318 } 1319 } 1320 1321 public int getScannersCount() { 1322 return scanners.size(); 1323 } 1324 1325 /** Returns The outstanding RegionScanner for <code>scannerId</code> or null if none found. */ 1326 RegionScanner getScanner(long scannerId) { 1327 RegionScannerHolder rsh = getRegionScannerHolder(scannerId); 1328 return rsh == null ? null : rsh.s; 1329 } 1330 1331 /** Returns The associated RegionScannerHolder for <code>scannerId</code> or null. */ 1332 private RegionScannerHolder getRegionScannerHolder(long scannerId) { 1333 return scanners.get(toScannerName(scannerId)); 1334 } 1335 1336 public String getScanDetailsWithId(long scannerId) { 1337 RegionScanner scanner = getScanner(scannerId); 1338 if (scanner == null) { 1339 return null; 1340 } 1341 StringBuilder builder = new StringBuilder(); 1342 builder.append("table: ").append(scanner.getRegionInfo().getTable().getNameAsString()); 1343 builder.append(" region: ").append(scanner.getRegionInfo().getRegionNameAsString()); 1344 builder.append(" operation_id: ").append(scanner.getOperationId()); 1345 return builder.toString(); 1346 } 1347 1348 public String getScanDetailsWithRequest(ScanRequest request) { 1349 try { 1350 if (!request.hasRegion()) { 1351 return null; 1352 } 1353 Region region = getRegion(request.getRegion()); 1354 StringBuilder builder = new StringBuilder(); 1355 builder.append("table: ").append(region.getRegionInfo().getTable().getNameAsString()); 1356 builder.append(" region: ").append(region.getRegionInfo().getRegionNameAsString()); 1357 for (NameBytesPair pair : request.getScan().getAttributeList()) { 1358 if (OperationWithAttributes.ID_ATRIBUTE.equals(pair.getName())) { 1359 builder.append(" operation_id: ").append(Bytes.toString(pair.getValue().toByteArray())); 1360 break; 1361 } 1362 } 1363 return builder.toString(); 1364 } catch (IOException ignored) { 1365 return null; 1366 } 1367 } 1368 1369 /** 1370 * Get the vtime associated with the scanner. Currently the vtime is the number of "next" calls. 1371 */ 1372 long getScannerVirtualTime(long scannerId) { 1373 RegionScannerHolder rsh = getRegionScannerHolder(scannerId); 1374 return rsh == null ? 0L : rsh.getNextCallSeq(); 1375 } 1376 1377 /** 1378 * Method to account for the size of retained cells and retained data blocks. 1379 * @param context rpc call context 1380 * @param r result to add size. 1381 * @param lastBlock last block to check whether we need to add the block size in context. 1382 * @return an object that represents the last referenced block from this response. 1383 */ 1384 Object addSize(RpcCallContext context, Result r, Object lastBlock) { 1385 if (context != null && r != null && !r.isEmpty()) { 1386 for (Cell c : r.rawCells()) { 1387 context.incrementResponseCellSize(PrivateCellUtil.estimatedSerializedSizeOf(c)); 1388 1389 // Since byte buffers can point all kinds of crazy places it's harder to keep track 1390 // of which blocks are kept alive by what byte buffer. 1391 // So we make a guess. 1392 if (c instanceof ByteBufferExtendedCell) { 1393 ByteBufferExtendedCell bbCell = (ByteBufferExtendedCell) c; 1394 ByteBuffer bb = bbCell.getValueByteBuffer(); 1395 if (bb != lastBlock) { 1396 context.incrementResponseBlockSize(bb.capacity()); 1397 lastBlock = bb; 1398 } 1399 } else { 1400 // We're using the last block being the same as the current block as 1401 // a proxy for pointing to a new block. This won't be exact. 1402 // If there are multiple gets that bounce back and forth 1403 // Then it's possible that this will over count the size of 1404 // referenced blocks. However it's better to over count and 1405 // use two rpcs than to OOME the regionserver. 1406 byte[] valueArray = c.getValueArray(); 1407 if (valueArray != lastBlock) { 1408 context.incrementResponseBlockSize(valueArray.length); 1409 lastBlock = valueArray; 1410 } 1411 } 1412 1413 } 1414 } 1415 return lastBlock; 1416 } 1417 1418 /** Returns Remote client's ip and port else null if can't be determined. */ 1419 @RestrictedApi(explanation = "Should only be called in TestRSRpcServices and RSRpcServices", 1420 link = "", allowedOnPath = ".*(TestRSRpcServices|RSRpcServices).java") 1421 static String getRemoteClientIpAndPort() { 1422 RpcCall rpcCall = RpcServer.getCurrentCall().orElse(null); 1423 if (rpcCall == null) { 1424 return HConstants.EMPTY_STRING; 1425 } 1426 InetAddress address = rpcCall.getRemoteAddress(); 1427 if (address == null) { 1428 return HConstants.EMPTY_STRING; 1429 } 1430 // Be careful here with InetAddress. Do InetAddress#getHostAddress. It will not do a name 1431 // resolution. Just use the IP. It is generally a smaller amount of info to keep around while 1432 // scanning than a hostname anyways. 1433 return Address.fromParts(address.getHostAddress(), rpcCall.getRemotePort()).toString(); 1434 } 1435 1436 /** Returns Remote client's username. */ 1437 @RestrictedApi(explanation = "Should only be called in TestRSRpcServices and RSRpcServices", 1438 link = "", allowedOnPath = ".*(TestRSRpcServices|RSRpcServices).java") 1439 static String getUserName() { 1440 RpcCall rpcCall = RpcServer.getCurrentCall().orElse(null); 1441 if (rpcCall == null) { 1442 return HConstants.EMPTY_STRING; 1443 } 1444 return rpcCall.getRequestUserName().orElse(HConstants.EMPTY_STRING); 1445 } 1446 1447 private RegionScannerHolder addScanner(String scannerName, RegionScanner s, Shipper shipper, 1448 HRegion r, boolean needCursor, boolean fullRegionScan) throws LeaseStillHeldException { 1449 Lease lease = regionServer.getLeaseManager().createLease(scannerName, 1450 this.scannerLeaseTimeoutPeriod, new ScannerListener(scannerName)); 1451 RpcCallback shippedCallback = new RegionScannerShippedCallBack(scannerName, shipper, lease); 1452 RpcCallback closeCallback = 1453 s instanceof RpcCallback ? (RpcCallback) s : new RegionScannerCloseCallBack(s); 1454 RegionScannerHolder rsh = new RegionScannerHolder(s, r, closeCallback, shippedCallback, 1455 needCursor, fullRegionScan, getRemoteClientIpAndPort(), getUserName()); 1456 RegionScannerHolder existing = scanners.putIfAbsent(scannerName, rsh); 1457 assert existing == null : "scannerId must be unique within regionserver's whole lifecycle! " 1458 + scannerName + ", " + existing; 1459 return rsh; 1460 } 1461 1462 private boolean isFullRegionScan(Scan scan, HRegion region) { 1463 // If the scan start row equals or less than the start key of the region 1464 // and stop row greater than equals end key (if stop row present) 1465 // or if the stop row is empty 1466 // account this as a full region scan 1467 if ( 1468 Bytes.compareTo(scan.getStartRow(), region.getRegionInfo().getStartKey()) <= 0 1469 && (Bytes.compareTo(scan.getStopRow(), region.getRegionInfo().getEndKey()) >= 0 1470 && !Bytes.equals(region.getRegionInfo().getEndKey(), HConstants.EMPTY_END_ROW) 1471 || Bytes.equals(scan.getStopRow(), HConstants.EMPTY_END_ROW)) 1472 ) { 1473 return true; 1474 } 1475 return false; 1476 } 1477 1478 /** 1479 * Find the HRegion based on a region specifier 1480 * @param regionSpecifier the region specifier 1481 * @return the corresponding region 1482 * @throws IOException if the specifier is not null, but failed to find the region 1483 */ 1484 public HRegion getRegion(final RegionSpecifier regionSpecifier) throws IOException { 1485 return regionServer.getRegion(regionSpecifier.getValue().toByteArray()); 1486 } 1487 1488 /** 1489 * Find the List of HRegions based on a list of region specifiers 1490 * @param regionSpecifiers the list of region specifiers 1491 * @return the corresponding list of regions 1492 * @throws IOException if any of the specifiers is not null, but failed to find the region 1493 */ 1494 private List<HRegion> getRegions(final List<RegionSpecifier> regionSpecifiers, 1495 final CacheEvictionStatsBuilder stats) { 1496 List<HRegion> regions = Lists.newArrayListWithCapacity(regionSpecifiers.size()); 1497 for (RegionSpecifier regionSpecifier : regionSpecifiers) { 1498 try { 1499 regions.add(regionServer.getRegion(regionSpecifier.getValue().toByteArray())); 1500 } catch (NotServingRegionException e) { 1501 stats.addException(regionSpecifier.getValue().toByteArray(), e); 1502 } 1503 } 1504 return regions; 1505 } 1506 1507 public PriorityFunction getPriority() { 1508 return priority; 1509 } 1510 1511 public Configuration getConfiguration() { 1512 return regionServer.getConfiguration(); 1513 } 1514 1515 private RegionServerRpcQuotaManager getRpcQuotaManager() { 1516 return regionServer.getRegionServerRpcQuotaManager(); 1517 } 1518 1519 private RegionServerSpaceQuotaManager getSpaceQuotaManager() { 1520 return regionServer.getRegionServerSpaceQuotaManager(); 1521 } 1522 1523 void start(ZKWatcher zkWatcher) { 1524 if (AccessChecker.isAuthorizationSupported(getConfiguration())) { 1525 accessChecker = new AccessChecker(getConfiguration()); 1526 } else { 1527 accessChecker = new NoopAccessChecker(getConfiguration()); 1528 } 1529 zkPermissionWatcher = 1530 new ZKPermissionWatcher(zkWatcher, accessChecker.getAuthManager(), getConfiguration()); 1531 try { 1532 zkPermissionWatcher.start(); 1533 } catch (KeeperException e) { 1534 LOG.error("ZooKeeper permission watcher initialization failed", e); 1535 } 1536 this.scannerIdGenerator = new ScannerIdGenerator(this.regionServer.serverName); 1537 rpcServer.start(); 1538 } 1539 1540 void stop() { 1541 if (zkPermissionWatcher != null) { 1542 zkPermissionWatcher.close(); 1543 } 1544 closeAllScanners(); 1545 rpcServer.stop(); 1546 } 1547 1548 /** 1549 * Called to verify that this server is up and running. 1550 */ 1551 // TODO : Rename this and HMaster#checkInitialized to isRunning() (or a better name). 1552 protected void checkOpen() throws IOException { 1553 if (regionServer.isAborted()) { 1554 throw new RegionServerAbortedException("Server " + regionServer.serverName + " aborting"); 1555 } 1556 if (regionServer.isStopped()) { 1557 throw new RegionServerStoppedException("Server " + regionServer.serverName + " stopping"); 1558 } 1559 if (!regionServer.isDataFileSystemOk()) { 1560 throw new RegionServerStoppedException("File system not available"); 1561 } 1562 if (!regionServer.isOnline()) { 1563 throw new ServerNotRunningYetException( 1564 "Server " + regionServer.serverName + " is not running yet"); 1565 } 1566 } 1567 1568 /** 1569 * By default, put up an Admin and a Client Service. Set booleans 1570 * <code>hbase.regionserver.admin.executorService</code> and 1571 * <code>hbase.regionserver.client.executorService</code> if you want to enable/disable services. 1572 * Default is that both are enabled. 1573 * @return immutable list of blocking services and the security info classes that this server 1574 * supports 1575 */ 1576 protected List<BlockingServiceAndInterface> getServices() { 1577 boolean admin = getConfiguration().getBoolean(REGIONSERVER_ADMIN_SERVICE_CONFIG, true); 1578 boolean client = getConfiguration().getBoolean(REGIONSERVER_CLIENT_SERVICE_CONFIG, true); 1579 boolean clientMeta = 1580 getConfiguration().getBoolean(REGIONSERVER_CLIENT_META_SERVICE_CONFIG, true); 1581 List<BlockingServiceAndInterface> bssi = new ArrayList<>(); 1582 if (client) { 1583 bssi.add(new BlockingServiceAndInterface(ClientService.newReflectiveBlockingService(this), 1584 ClientService.BlockingInterface.class)); 1585 } 1586 if (admin) { 1587 bssi.add(new BlockingServiceAndInterface(AdminService.newReflectiveBlockingService(this), 1588 AdminService.BlockingInterface.class)); 1589 } 1590 if (clientMeta) { 1591 bssi.add(new BlockingServiceAndInterface(ClientMetaService.newReflectiveBlockingService(this), 1592 ClientMetaService.BlockingInterface.class)); 1593 } 1594 return new ImmutableList.Builder<BlockingServiceAndInterface>().addAll(bssi).build(); 1595 } 1596 1597 public InetSocketAddress getSocketAddress() { 1598 return isa; 1599 } 1600 1601 @Override 1602 public int getPriority(RequestHeader header, Message param, User user) { 1603 return priority.getPriority(header, param, user); 1604 } 1605 1606 @Override 1607 public long getDeadline(RequestHeader header, Message param) { 1608 return priority.getDeadline(header, param); 1609 } 1610 1611 /* 1612 * Check if an OOME and, if so, abort immediately to avoid creating more objects. n * 1613 * @return True if we OOME'd and are aborting. 1614 */ 1615 @Override 1616 public boolean checkOOME(final Throwable e) { 1617 return exitIfOOME(e); 1618 } 1619 1620 public static boolean exitIfOOME(final Throwable e) { 1621 boolean stop = false; 1622 try { 1623 if ( 1624 e instanceof OutOfMemoryError 1625 || (e.getCause() != null && e.getCause() instanceof OutOfMemoryError) 1626 || (e.getMessage() != null && e.getMessage().contains("java.lang.OutOfMemoryError")) 1627 ) { 1628 stop = true; 1629 LOG.error(HBaseMarkers.FATAL, "Run out of memory; " + RSRpcServices.class.getSimpleName() 1630 + " will abort itself immediately", e); 1631 } 1632 } finally { 1633 if (stop) { 1634 Runtime.getRuntime().halt(1); 1635 } 1636 } 1637 return stop; 1638 } 1639 1640 /** 1641 * Close a region on the region server. 1642 * @param controller the RPC controller 1643 * @param request the request n 1644 */ 1645 @Override 1646 @QosPriority(priority = HConstants.ADMIN_QOS) 1647 public CloseRegionResponse closeRegion(final RpcController controller, 1648 final CloseRegionRequest request) throws ServiceException { 1649 final ServerName sn = (request.hasDestinationServer() 1650 ? ProtobufUtil.toServerName(request.getDestinationServer()) 1651 : null); 1652 1653 try { 1654 checkOpen(); 1655 throwOnWrongStartCode(request); 1656 final String encodedRegionName = ProtobufUtil.getRegionEncodedName(request.getRegion()); 1657 1658 requestCount.increment(); 1659 if (sn == null) { 1660 LOG.info("Close " + encodedRegionName + " without moving"); 1661 } else { 1662 LOG.info("Close " + encodedRegionName + ", moving to " + sn); 1663 } 1664 boolean closed = regionServer.closeRegion(encodedRegionName, false, sn); 1665 CloseRegionResponse.Builder builder = CloseRegionResponse.newBuilder().setClosed(closed); 1666 return builder.build(); 1667 } catch (IOException ie) { 1668 throw new ServiceException(ie); 1669 } 1670 } 1671 1672 /** 1673 * Compact a region on the region server. 1674 * @param controller the RPC controller 1675 * @param request the request n 1676 */ 1677 @Override 1678 @QosPriority(priority = HConstants.ADMIN_QOS) 1679 public CompactRegionResponse compactRegion(final RpcController controller, 1680 final CompactRegionRequest request) throws ServiceException { 1681 try { 1682 checkOpen(); 1683 requestCount.increment(); 1684 HRegion region = getRegion(request.getRegion()); 1685 // Quota support is enabled, the requesting user is not system/super user 1686 // and a quota policy is enforced that disables compactions. 1687 if ( 1688 QuotaUtil.isQuotaEnabled(getConfiguration()) 1689 && !Superusers.isSuperUser(RpcServer.getRequestUser().orElse(null)) 1690 && this.regionServer.getRegionServerSpaceQuotaManager() 1691 .areCompactionsDisabled(region.getTableDescriptor().getTableName()) 1692 ) { 1693 throw new DoNotRetryIOException( 1694 "Compactions on this region are " + "disabled due to a space quota violation."); 1695 } 1696 region.startRegionOperation(Operation.COMPACT_REGION); 1697 LOG.info("Compacting " + region.getRegionInfo().getRegionNameAsString()); 1698 boolean major = request.hasMajor() && request.getMajor(); 1699 if (request.hasFamily()) { 1700 byte[] family = request.getFamily().toByteArray(); 1701 String log = "User-triggered " + (major ? "major " : "") + "compaction for region " 1702 + region.getRegionInfo().getRegionNameAsString() + " and family " 1703 + Bytes.toString(family); 1704 LOG.trace(log); 1705 region.requestCompaction(family, log, Store.PRIORITY_USER, major, 1706 CompactionLifeCycleTracker.DUMMY); 1707 } else { 1708 String log = "User-triggered " + (major ? "major " : "") + "compaction for region " 1709 + region.getRegionInfo().getRegionNameAsString(); 1710 LOG.trace(log); 1711 region.requestCompaction(log, Store.PRIORITY_USER, major, CompactionLifeCycleTracker.DUMMY); 1712 } 1713 return CompactRegionResponse.newBuilder().build(); 1714 } catch (IOException ie) { 1715 throw new ServiceException(ie); 1716 } 1717 } 1718 1719 @Override 1720 public CompactionSwitchResponse compactionSwitch(RpcController controller, 1721 CompactionSwitchRequest request) throws ServiceException { 1722 rpcPreCheck("compactionSwitch"); 1723 final CompactSplit compactSplitThread = regionServer.getCompactSplitThread(); 1724 requestCount.increment(); 1725 boolean prevState = compactSplitThread.isCompactionsEnabled(); 1726 CompactionSwitchResponse response = 1727 CompactionSwitchResponse.newBuilder().setPrevState(prevState).build(); 1728 if (prevState == request.getEnabled()) { 1729 // passed in requested state is same as current state. No action required 1730 return response; 1731 } 1732 compactSplitThread.switchCompaction(request.getEnabled()); 1733 return response; 1734 } 1735 1736 /** 1737 * Flush a region on the region server. 1738 * @param controller the RPC controller 1739 * @param request the request n 1740 */ 1741 @Override 1742 @QosPriority(priority = HConstants.ADMIN_QOS) 1743 public FlushRegionResponse flushRegion(final RpcController controller, 1744 final FlushRegionRequest request) throws ServiceException { 1745 try { 1746 checkOpen(); 1747 requestCount.increment(); 1748 HRegion region = getRegion(request.getRegion()); 1749 LOG.info("Flushing " + region.getRegionInfo().getRegionNameAsString()); 1750 boolean shouldFlush = true; 1751 if (request.hasIfOlderThanTs()) { 1752 shouldFlush = region.getEarliestFlushTimeForAllStores() < request.getIfOlderThanTs(); 1753 } 1754 FlushRegionResponse.Builder builder = FlushRegionResponse.newBuilder(); 1755 if (shouldFlush) { 1756 boolean writeFlushWalMarker = 1757 request.hasWriteFlushWalMarker() ? request.getWriteFlushWalMarker() : false; 1758 // Go behind the curtain so we can manage writing of the flush WAL marker 1759 HRegion.FlushResultImpl flushResult = null; 1760 if (request.hasFamily()) { 1761 List families = new ArrayList(); 1762 families.add(request.getFamily().toByteArray()); 1763 flushResult = 1764 region.flushcache(families, writeFlushWalMarker, FlushLifeCycleTracker.DUMMY); 1765 } else { 1766 flushResult = region.flushcache(true, writeFlushWalMarker, FlushLifeCycleTracker.DUMMY); 1767 } 1768 boolean compactionNeeded = flushResult.isCompactionNeeded(); 1769 if (compactionNeeded) { 1770 regionServer.getCompactSplitThread().requestSystemCompaction(region, 1771 "Compaction through user triggered flush"); 1772 } 1773 builder.setFlushed(flushResult.isFlushSucceeded()); 1774 builder.setWroteFlushWalMarker(flushResult.wroteFlushWalMarker); 1775 } 1776 builder.setLastFlushTime(region.getEarliestFlushTimeForAllStores()); 1777 return builder.build(); 1778 } catch (DroppedSnapshotException ex) { 1779 // Cache flush can fail in a few places. If it fails in a critical 1780 // section, we get a DroppedSnapshotException and a replay of wal 1781 // is required. Currently the only way to do this is a restart of 1782 // the server. 1783 regionServer.abort("Replay of WAL required. Forcing server shutdown", ex); 1784 throw new ServiceException(ex); 1785 } catch (IOException ie) { 1786 throw new ServiceException(ie); 1787 } 1788 } 1789 1790 @Override 1791 @QosPriority(priority = HConstants.ADMIN_QOS) 1792 public GetOnlineRegionResponse getOnlineRegion(final RpcController controller, 1793 final GetOnlineRegionRequest request) throws ServiceException { 1794 try { 1795 checkOpen(); 1796 requestCount.increment(); 1797 Map<String, HRegion> onlineRegions = regionServer.getOnlineRegions(); 1798 List<RegionInfo> list = new ArrayList<>(onlineRegions.size()); 1799 for (HRegion region : onlineRegions.values()) { 1800 list.add(region.getRegionInfo()); 1801 } 1802 list.sort(RegionInfo.COMPARATOR); 1803 return ResponseConverter.buildGetOnlineRegionResponse(list); 1804 } catch (IOException ie) { 1805 throw new ServiceException(ie); 1806 } 1807 } 1808 1809 // Master implementation of this Admin Service differs given it is not 1810 // able to supply detail only known to RegionServer. See note on 1811 // MasterRpcServers#getRegionInfo. 1812 @Override 1813 @QosPriority(priority = HConstants.ADMIN_QOS) 1814 public GetRegionInfoResponse getRegionInfo(final RpcController controller, 1815 final GetRegionInfoRequest request) throws ServiceException { 1816 try { 1817 checkOpen(); 1818 requestCount.increment(); 1819 HRegion region = getRegion(request.getRegion()); 1820 RegionInfo info = region.getRegionInfo(); 1821 byte[] bestSplitRow; 1822 if (request.hasBestSplitRow() && request.getBestSplitRow()) { 1823 bestSplitRow = region.checkSplit(true).orElse(null); 1824 // when all table data are in memstore, bestSplitRow = null 1825 // try to flush region first 1826 if (bestSplitRow == null) { 1827 region.flush(true); 1828 bestSplitRow = region.checkSplit(true).orElse(null); 1829 } 1830 } else { 1831 bestSplitRow = null; 1832 } 1833 GetRegionInfoResponse.Builder builder = GetRegionInfoResponse.newBuilder(); 1834 builder.setRegionInfo(ProtobufUtil.toRegionInfo(info)); 1835 if (request.hasCompactionState() && request.getCompactionState()) { 1836 builder.setCompactionState(ProtobufUtil.createCompactionState(region.getCompactionState())); 1837 } 1838 builder.setSplittable(region.isSplittable()); 1839 builder.setMergeable(region.isMergeable()); 1840 if (request.hasBestSplitRow() && request.getBestSplitRow() && bestSplitRow != null) { 1841 builder.setBestSplitRow(UnsafeByteOperations.unsafeWrap(bestSplitRow)); 1842 } 1843 return builder.build(); 1844 } catch (IOException ie) { 1845 throw new ServiceException(ie); 1846 } 1847 } 1848 1849 @Override 1850 @QosPriority(priority = HConstants.ADMIN_QOS) 1851 public GetRegionLoadResponse getRegionLoad(RpcController controller, GetRegionLoadRequest request) 1852 throws ServiceException { 1853 1854 List<HRegion> regions; 1855 if (request.hasTableName()) { 1856 TableName tableName = ProtobufUtil.toTableName(request.getTableName()); 1857 regions = regionServer.getRegions(tableName); 1858 } else { 1859 regions = regionServer.getRegions(); 1860 } 1861 List<RegionLoad> rLoads = new ArrayList<>(regions.size()); 1862 RegionLoad.Builder regionLoadBuilder = ClusterStatusProtos.RegionLoad.newBuilder(); 1863 RegionSpecifier.Builder regionSpecifier = RegionSpecifier.newBuilder(); 1864 1865 try { 1866 for (HRegion region : regions) { 1867 rLoads.add(regionServer.createRegionLoad(region, regionLoadBuilder, regionSpecifier)); 1868 } 1869 } catch (IOException e) { 1870 throw new ServiceException(e); 1871 } 1872 GetRegionLoadResponse.Builder builder = GetRegionLoadResponse.newBuilder(); 1873 builder.addAllRegionLoads(rLoads); 1874 return builder.build(); 1875 } 1876 1877 @Override 1878 @QosPriority(priority = HConstants.ADMIN_QOS) 1879 public ClearCompactionQueuesResponse clearCompactionQueues(RpcController controller, 1880 ClearCompactionQueuesRequest request) throws ServiceException { 1881 LOG.debug("Client=" + RpcServer.getRequestUserName().orElse(null) + "/" 1882 + RpcServer.getRemoteAddress().orElse(null) + " clear compactions queue"); 1883 ClearCompactionQueuesResponse.Builder respBuilder = ClearCompactionQueuesResponse.newBuilder(); 1884 requestCount.increment(); 1885 if (clearCompactionQueues.compareAndSet(false, true)) { 1886 final CompactSplit compactSplitThread = regionServer.getCompactSplitThread(); 1887 try { 1888 checkOpen(); 1889 regionServer.getRegionServerCoprocessorHost().preClearCompactionQueues(); 1890 for (String queueName : request.getQueueNameList()) { 1891 LOG.debug("clear " + queueName + " compaction queue"); 1892 switch (queueName) { 1893 case "long": 1894 compactSplitThread.clearLongCompactionsQueue(); 1895 break; 1896 case "short": 1897 compactSplitThread.clearShortCompactionsQueue(); 1898 break; 1899 default: 1900 LOG.warn("Unknown queue name " + queueName); 1901 throw new IOException("Unknown queue name " + queueName); 1902 } 1903 } 1904 regionServer.getRegionServerCoprocessorHost().postClearCompactionQueues(); 1905 } catch (IOException ie) { 1906 throw new ServiceException(ie); 1907 } finally { 1908 clearCompactionQueues.set(false); 1909 } 1910 } else { 1911 LOG.warn("Clear compactions queue is executing by other admin."); 1912 } 1913 return respBuilder.build(); 1914 } 1915 1916 /** 1917 * Get some information of the region server. 1918 * @param controller the RPC controller 1919 * @param request the request n 1920 */ 1921 @Override 1922 @QosPriority(priority = HConstants.ADMIN_QOS) 1923 public GetServerInfoResponse getServerInfo(final RpcController controller, 1924 final GetServerInfoRequest request) throws ServiceException { 1925 try { 1926 checkOpen(); 1927 } catch (IOException ie) { 1928 throw new ServiceException(ie); 1929 } 1930 requestCount.increment(); 1931 int infoPort = regionServer.infoServer != null ? regionServer.infoServer.getPort() : -1; 1932 return ResponseConverter.buildGetServerInfoResponse(regionServer.serverName, infoPort); 1933 } 1934 1935 @Override 1936 @QosPriority(priority = HConstants.ADMIN_QOS) 1937 public GetStoreFileResponse getStoreFile(final RpcController controller, 1938 final GetStoreFileRequest request) throws ServiceException { 1939 try { 1940 checkOpen(); 1941 HRegion region = getRegion(request.getRegion()); 1942 requestCount.increment(); 1943 Set<byte[]> columnFamilies; 1944 if (request.getFamilyCount() == 0) { 1945 columnFamilies = region.getTableDescriptor().getColumnFamilyNames(); 1946 } else { 1947 columnFamilies = new TreeSet<>(Bytes.BYTES_RAWCOMPARATOR); 1948 for (ByteString cf : request.getFamilyList()) { 1949 columnFamilies.add(cf.toByteArray()); 1950 } 1951 } 1952 int nCF = columnFamilies.size(); 1953 List<String> fileList = region.getStoreFileList(columnFamilies.toArray(new byte[nCF][])); 1954 GetStoreFileResponse.Builder builder = GetStoreFileResponse.newBuilder(); 1955 builder.addAllStoreFile(fileList); 1956 return builder.build(); 1957 } catch (IOException ie) { 1958 throw new ServiceException(ie); 1959 } 1960 } 1961 1962 private void throwOnWrongStartCode(OpenRegionRequest request) throws ServiceException { 1963 if (!request.hasServerStartCode()) { 1964 LOG.warn("OpenRegionRequest for {} does not have a start code", request.getOpenInfoList()); 1965 return; 1966 } 1967 throwOnWrongStartCode(request.getServerStartCode()); 1968 } 1969 1970 private void throwOnWrongStartCode(CloseRegionRequest request) throws ServiceException { 1971 if (!request.hasServerStartCode()) { 1972 LOG.warn("CloseRegionRequest for {} does not have a start code", request.getRegion()); 1973 return; 1974 } 1975 throwOnWrongStartCode(request.getServerStartCode()); 1976 } 1977 1978 private void throwOnWrongStartCode(long serverStartCode) throws ServiceException { 1979 // check that we are the same server that this RPC is intended for. 1980 if (regionServer.serverName.getStartcode() != serverStartCode) { 1981 throw new ServiceException(new DoNotRetryIOException( 1982 "This RPC was intended for a " + "different server with startCode: " + serverStartCode 1983 + ", this server is: " + regionServer.serverName)); 1984 } 1985 } 1986 1987 private void throwOnWrongStartCode(ExecuteProceduresRequest req) throws ServiceException { 1988 if (req.getOpenRegionCount() > 0) { 1989 for (OpenRegionRequest openReq : req.getOpenRegionList()) { 1990 throwOnWrongStartCode(openReq); 1991 } 1992 } 1993 if (req.getCloseRegionCount() > 0) { 1994 for (CloseRegionRequest closeReq : req.getCloseRegionList()) { 1995 throwOnWrongStartCode(closeReq); 1996 } 1997 } 1998 } 1999 2000 /** 2001 * Open asynchronously a region or a set of regions on the region server. The opening is 2002 * coordinated by ZooKeeper, and this method requires the znode to be created before being called. 2003 * As a consequence, this method should be called only from the master. 2004 * <p> 2005 * Different manages states for the region are: 2006 * </p> 2007 * <ul> 2008 * <li>region not opened: the region opening will start asynchronously.</li> 2009 * <li>a close is already in progress: this is considered as an error.</li> 2010 * <li>an open is already in progress: this new open request will be ignored. This is important 2011 * because the Master can do multiple requests if it crashes.</li> 2012 * <li>the region is already opened: this new open request will be ignored.</li> 2013 * </ul> 2014 * <p> 2015 * Bulk assign: If there are more than 1 region to open, it will be considered as a bulk assign. 2016 * For a single region opening, errors are sent through a ServiceException. For bulk assign, 2017 * errors are put in the response as FAILED_OPENING. 2018 * </p> 2019 * @param controller the RPC controller 2020 * @param request the request n 2021 */ 2022 @Override 2023 @QosPriority(priority = HConstants.ADMIN_QOS) 2024 public OpenRegionResponse openRegion(final RpcController controller, 2025 final OpenRegionRequest request) throws ServiceException { 2026 requestCount.increment(); 2027 throwOnWrongStartCode(request); 2028 2029 OpenRegionResponse.Builder builder = OpenRegionResponse.newBuilder(); 2030 final int regionCount = request.getOpenInfoCount(); 2031 final Map<TableName, TableDescriptor> htds = new HashMap<>(regionCount); 2032 final boolean isBulkAssign = regionCount > 1; 2033 try { 2034 checkOpen(); 2035 } catch (IOException ie) { 2036 TableName tableName = null; 2037 if (regionCount == 1) { 2038 org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionInfo ri = 2039 request.getOpenInfo(0).getRegion(); 2040 if (ri != null) { 2041 tableName = ProtobufUtil.toTableName(ri.getTableName()); 2042 } 2043 } 2044 if (!TableName.META_TABLE_NAME.equals(tableName)) { 2045 throw new ServiceException(ie); 2046 } 2047 // We are assigning meta, wait a little for regionserver to finish initialization. 2048 // Default to quarter of RPC timeout 2049 int timeout = regionServer.getConfiguration().getInt(HConstants.HBASE_RPC_TIMEOUT_KEY, 2050 HConstants.DEFAULT_HBASE_RPC_TIMEOUT) >> 2; 2051 long endTime = EnvironmentEdgeManager.currentTime() + timeout; 2052 synchronized (regionServer.online) { 2053 try { 2054 while ( 2055 EnvironmentEdgeManager.currentTime() <= endTime && !regionServer.isStopped() 2056 && !regionServer.isOnline() 2057 ) { 2058 regionServer.online.wait(regionServer.msgInterval); 2059 } 2060 checkOpen(); 2061 } catch (InterruptedException t) { 2062 Thread.currentThread().interrupt(); 2063 throw new ServiceException(t); 2064 } catch (IOException e) { 2065 throw new ServiceException(e); 2066 } 2067 } 2068 } 2069 2070 long masterSystemTime = request.hasMasterSystemTime() ? request.getMasterSystemTime() : -1; 2071 2072 for (RegionOpenInfo regionOpenInfo : request.getOpenInfoList()) { 2073 final RegionInfo region = ProtobufUtil.toRegionInfo(regionOpenInfo.getRegion()); 2074 TableDescriptor htd; 2075 try { 2076 String encodedName = region.getEncodedName(); 2077 byte[] encodedNameBytes = region.getEncodedNameAsBytes(); 2078 final HRegion onlineRegion = regionServer.getRegion(encodedName); 2079 if (onlineRegion != null) { 2080 // The region is already online. This should not happen any more. 2081 String error = "Received OPEN for the region:" + region.getRegionNameAsString() 2082 + ", which is already online"; 2083 LOG.warn(error); 2084 // regionServer.abort(error); 2085 // throw new IOException(error); 2086 builder.addOpeningState(RegionOpeningState.OPENED); 2087 continue; 2088 } 2089 LOG.info("Open " + region.getRegionNameAsString()); 2090 2091 final Boolean previous = 2092 regionServer.getRegionsInTransitionInRS().putIfAbsent(encodedNameBytes, Boolean.TRUE); 2093 2094 if (Boolean.FALSE.equals(previous)) { 2095 if (regionServer.getRegion(encodedName) != null) { 2096 // There is a close in progress. This should not happen any more. 2097 String error = "Received OPEN for the region:" + region.getRegionNameAsString() 2098 + ", which we are already trying to CLOSE"; 2099 regionServer.abort(error); 2100 throw new IOException(error); 2101 } 2102 regionServer.getRegionsInTransitionInRS().put(encodedNameBytes, Boolean.TRUE); 2103 } 2104 2105 if (Boolean.TRUE.equals(previous)) { 2106 // An open is in progress. This is supported, but let's log this. 2107 LOG.info("Receiving OPEN for the region:" + region.getRegionNameAsString() 2108 + ", which we are already trying to OPEN" 2109 + " - ignoring this new request for this region."); 2110 } 2111 2112 // We are opening this region. If it moves back and forth for whatever reason, we don't 2113 // want to keep returning the stale moved record while we are opening/if we close again. 2114 regionServer.removeFromMovedRegions(region.getEncodedName()); 2115 2116 if (previous == null || !previous.booleanValue()) { 2117 htd = htds.get(region.getTable()); 2118 if (htd == null) { 2119 htd = regionServer.tableDescriptors.get(region.getTable()); 2120 htds.put(region.getTable(), htd); 2121 } 2122 if (htd == null) { 2123 throw new IOException("Missing table descriptor for " + region.getEncodedName()); 2124 } 2125 // If there is no action in progress, we can submit a specific handler. 2126 // Need to pass the expected version in the constructor. 2127 if (regionServer.executorService == null) { 2128 LOG.info("No executor executorService; skipping open request"); 2129 } else { 2130 if (region.isMetaRegion()) { 2131 regionServer.executorService.submit( 2132 new OpenMetaHandler(regionServer, regionServer, region, htd, masterSystemTime)); 2133 } else { 2134 if (regionOpenInfo.getFavoredNodesCount() > 0) { 2135 regionServer.updateRegionFavoredNodesMapping(region.getEncodedName(), 2136 regionOpenInfo.getFavoredNodesList()); 2137 } 2138 if (htd.getPriority() >= HConstants.ADMIN_QOS || region.getTable().isSystemTable()) { 2139 regionServer.executorService.submit(new OpenPriorityRegionHandler(regionServer, 2140 regionServer, region, htd, masterSystemTime)); 2141 } else { 2142 regionServer.executorService.submit( 2143 new OpenRegionHandler(regionServer, regionServer, region, htd, masterSystemTime)); 2144 } 2145 } 2146 } 2147 } 2148 2149 builder.addOpeningState(RegionOpeningState.OPENED); 2150 } catch (IOException ie) { 2151 LOG.warn("Failed opening region " + region.getRegionNameAsString(), ie); 2152 if (isBulkAssign) { 2153 builder.addOpeningState(RegionOpeningState.FAILED_OPENING); 2154 } else { 2155 throw new ServiceException(ie); 2156 } 2157 } 2158 } 2159 return builder.build(); 2160 } 2161 2162 /** 2163 * Warmup a region on this server. This method should only be called by Master. It synchronously 2164 * opens the region and closes the region bringing the most important pages in cache. 2165 */ 2166 @Override 2167 public WarmupRegionResponse warmupRegion(final RpcController controller, 2168 final WarmupRegionRequest request) throws ServiceException { 2169 final RegionInfo region = ProtobufUtil.toRegionInfo(request.getRegionInfo()); 2170 WarmupRegionResponse response = WarmupRegionResponse.getDefaultInstance(); 2171 try { 2172 checkOpen(); 2173 String encodedName = region.getEncodedName(); 2174 byte[] encodedNameBytes = region.getEncodedNameAsBytes(); 2175 final HRegion onlineRegion = regionServer.getRegion(encodedName); 2176 if (onlineRegion != null) { 2177 LOG.info("{} is online; skipping warmup", region); 2178 return response; 2179 } 2180 TableDescriptor htd = regionServer.tableDescriptors.get(region.getTable()); 2181 if (regionServer.getRegionsInTransitionInRS().containsKey(encodedNameBytes)) { 2182 LOG.info("{} is in transition; skipping warmup", region); 2183 return response; 2184 } 2185 LOG.info("Warmup {}", region.getRegionNameAsString()); 2186 HRegion.warmupHRegion(region, htd, regionServer.getWAL(region), 2187 regionServer.getConfiguration(), regionServer, null); 2188 } catch (IOException ie) { 2189 LOG.error("Failed warmup of {}", region.getRegionNameAsString(), ie); 2190 throw new ServiceException(ie); 2191 } 2192 2193 return response; 2194 } 2195 2196 /** 2197 * Replay the given changes when distributedLogReplay WAL edits from a failed RS. The guarantee is 2198 * that the given mutations will be durable on the receiving RS if this method returns without any 2199 * exception. 2200 * @param controller the RPC controller 2201 * @param request the request n 2202 */ 2203 @Override 2204 @QosPriority(priority = HConstants.REPLAY_QOS) 2205 public ReplicateWALEntryResponse replay(final RpcController controller, 2206 final ReplicateWALEntryRequest request) throws ServiceException { 2207 long before = EnvironmentEdgeManager.currentTime(); 2208 CellScanner cells = ((HBaseRpcController) controller).cellScanner(); 2209 ((HBaseRpcController) controller).setCellScanner(null); 2210 try { 2211 checkOpen(); 2212 List<WALEntry> entries = request.getEntryList(); 2213 if (entries == null || entries.isEmpty()) { 2214 // empty input 2215 return ReplicateWALEntryResponse.newBuilder().build(); 2216 } 2217 ByteString regionName = entries.get(0).getKey().getEncodedRegionName(); 2218 HRegion region = regionServer.getRegionByEncodedName(regionName.toStringUtf8()); 2219 RegionCoprocessorHost coprocessorHost = 2220 ServerRegionReplicaUtil.isDefaultReplica(region.getRegionInfo()) 2221 ? region.getCoprocessorHost() 2222 : null; // do not invoke coprocessors if this is a secondary region replica 2223 List<Pair<WALKey, WALEdit>> walEntries = new ArrayList<>(); 2224 2225 // Skip adding the edits to WAL if this is a secondary region replica 2226 boolean isPrimary = RegionReplicaUtil.isDefaultReplica(region.getRegionInfo()); 2227 Durability durability = isPrimary ? Durability.USE_DEFAULT : Durability.SKIP_WAL; 2228 2229 for (WALEntry entry : entries) { 2230 if (!regionName.equals(entry.getKey().getEncodedRegionName())) { 2231 throw new NotServingRegionException("Replay request contains entries from multiple " 2232 + "regions. First region:" + regionName.toStringUtf8() + " , other region:" 2233 + entry.getKey().getEncodedRegionName()); 2234 } 2235 if (regionServer.nonceManager != null && isPrimary) { 2236 long nonceGroup = 2237 entry.getKey().hasNonceGroup() ? entry.getKey().getNonceGroup() : HConstants.NO_NONCE; 2238 long nonce = entry.getKey().hasNonce() ? entry.getKey().getNonce() : HConstants.NO_NONCE; 2239 regionServer.nonceManager.reportOperationFromWal(nonceGroup, nonce, 2240 entry.getKey().getWriteTime()); 2241 } 2242 Pair<WALKey, WALEdit> walEntry = (coprocessorHost == null) ? null : new Pair<>(); 2243 List<MutationReplay> edits = 2244 WALSplitUtil.getMutationsFromWALEntry(entry, cells, walEntry, durability); 2245 if (coprocessorHost != null) { 2246 // Start coprocessor replay here. The coprocessor is for each WALEdit instead of a 2247 // KeyValue. 2248 if ( 2249 coprocessorHost.preWALRestore(region.getRegionInfo(), walEntry.getFirst(), 2250 walEntry.getSecond()) 2251 ) { 2252 // if bypass this log entry, ignore it ... 2253 continue; 2254 } 2255 walEntries.add(walEntry); 2256 } 2257 if (edits != null && !edits.isEmpty()) { 2258 // HBASE-17924 2259 // sort to improve lock efficiency 2260 Collections.sort(edits, (v1, v2) -> Row.COMPARATOR.compare(v1.mutation, v2.mutation)); 2261 long replaySeqId = (entry.getKey().hasOrigSequenceNumber()) 2262 ? entry.getKey().getOrigSequenceNumber() 2263 : entry.getKey().getLogSequenceNumber(); 2264 OperationStatus[] result = doReplayBatchOp(region, edits, replaySeqId); 2265 // check if it's a partial success 2266 for (int i = 0; result != null && i < result.length; i++) { 2267 if (result[i] != OperationStatus.SUCCESS) { 2268 throw new IOException(result[i].getExceptionMsg()); 2269 } 2270 } 2271 } 2272 } 2273 2274 // sync wal at the end because ASYNC_WAL is used above 2275 WAL wal = region.getWAL(); 2276 if (wal != null) { 2277 wal.sync(); 2278 } 2279 2280 if (coprocessorHost != null) { 2281 for (Pair<WALKey, WALEdit> entry : walEntries) { 2282 coprocessorHost.postWALRestore(region.getRegionInfo(), entry.getFirst(), 2283 entry.getSecond()); 2284 } 2285 } 2286 return ReplicateWALEntryResponse.newBuilder().build(); 2287 } catch (IOException ie) { 2288 throw new ServiceException(ie); 2289 } finally { 2290 final MetricsRegionServer metricsRegionServer = regionServer.getMetrics(); 2291 if (metricsRegionServer != null) { 2292 metricsRegionServer.updateReplay(EnvironmentEdgeManager.currentTime() - before); 2293 } 2294 } 2295 } 2296 2297 /** 2298 * Replicate WAL entries on the region server. 2299 * @param controller the RPC controller 2300 * @param request the request 2301 */ 2302 @Override 2303 @QosPriority(priority = HConstants.REPLICATION_QOS) 2304 public ReplicateWALEntryResponse replicateWALEntry(final RpcController controller, 2305 final ReplicateWALEntryRequest request) throws ServiceException { 2306 try { 2307 checkOpen(); 2308 if (regionServer.getReplicationSinkService() != null) { 2309 requestCount.increment(); 2310 List<WALEntry> entries = request.getEntryList(); 2311 CellScanner cellScanner = ((HBaseRpcController) controller).cellScanner(); 2312 ((HBaseRpcController) controller).setCellScanner(null); 2313 regionServer.getRegionServerCoprocessorHost().preReplicateLogEntries(); 2314 regionServer.getReplicationSinkService().replicateLogEntries(entries, cellScanner, 2315 request.getReplicationClusterId(), request.getSourceBaseNamespaceDirPath(), 2316 request.getSourceHFileArchiveDirPath()); 2317 regionServer.getRegionServerCoprocessorHost().postReplicateLogEntries(); 2318 return ReplicateWALEntryResponse.newBuilder().build(); 2319 } else { 2320 throw new ServiceException("Replication services are not initialized yet"); 2321 } 2322 } catch (IOException ie) { 2323 throw new ServiceException(ie); 2324 } 2325 } 2326 2327 /** 2328 * Roll the WAL writer of the region server. 2329 * @param controller the RPC controller 2330 * @param request the request n 2331 */ 2332 @Override 2333 public RollWALWriterResponse rollWALWriter(final RpcController controller, 2334 final RollWALWriterRequest request) throws ServiceException { 2335 try { 2336 checkOpen(); 2337 requestCount.increment(); 2338 regionServer.getRegionServerCoprocessorHost().preRollWALWriterRequest(); 2339 regionServer.getWalRoller().requestRollAll(); 2340 regionServer.getRegionServerCoprocessorHost().postRollWALWriterRequest(); 2341 RollWALWriterResponse.Builder builder = RollWALWriterResponse.newBuilder(); 2342 return builder.build(); 2343 } catch (IOException ie) { 2344 throw new ServiceException(ie); 2345 } 2346 } 2347 2348 /** 2349 * Stop the region server. 2350 * @param controller the RPC controller 2351 * @param request the request n 2352 */ 2353 @Override 2354 @QosPriority(priority = HConstants.ADMIN_QOS) 2355 public StopServerResponse stopServer(final RpcController controller, 2356 final StopServerRequest request) throws ServiceException { 2357 rpcPreCheck("stopServer"); 2358 requestCount.increment(); 2359 String reason = request.getReason(); 2360 regionServer.stop(reason); 2361 return StopServerResponse.newBuilder().build(); 2362 } 2363 2364 @Override 2365 public UpdateFavoredNodesResponse updateFavoredNodes(RpcController controller, 2366 UpdateFavoredNodesRequest request) throws ServiceException { 2367 rpcPreCheck("updateFavoredNodes"); 2368 List<UpdateFavoredNodesRequest.RegionUpdateInfo> openInfoList = request.getUpdateInfoList(); 2369 UpdateFavoredNodesResponse.Builder respBuilder = UpdateFavoredNodesResponse.newBuilder(); 2370 for (UpdateFavoredNodesRequest.RegionUpdateInfo regionUpdateInfo : openInfoList) { 2371 RegionInfo hri = ProtobufUtil.toRegionInfo(regionUpdateInfo.getRegion()); 2372 if (regionUpdateInfo.getFavoredNodesCount() > 0) { 2373 regionServer.updateRegionFavoredNodesMapping(hri.getEncodedName(), 2374 regionUpdateInfo.getFavoredNodesList()); 2375 } 2376 } 2377 respBuilder.setResponse(openInfoList.size()); 2378 return respBuilder.build(); 2379 } 2380 2381 /** 2382 * Atomically bulk load several HFiles into an open region 2383 * @return true if successful, false is failed but recoverably (no action) 2384 * @throws ServiceException if failed unrecoverably 2385 */ 2386 @Override 2387 public BulkLoadHFileResponse bulkLoadHFile(final RpcController controller, 2388 final BulkLoadHFileRequest request) throws ServiceException { 2389 long start = EnvironmentEdgeManager.currentTime(); 2390 List<String> clusterIds = new ArrayList<>(request.getClusterIdsList()); 2391 if (clusterIds.contains(this.regionServer.clusterId)) { 2392 return BulkLoadHFileResponse.newBuilder().setLoaded(true).build(); 2393 } else { 2394 clusterIds.add(this.regionServer.clusterId); 2395 } 2396 try { 2397 checkOpen(); 2398 requestCount.increment(); 2399 HRegion region = getRegion(request.getRegion()); 2400 Map<byte[], List<Path>> map = null; 2401 final boolean spaceQuotaEnabled = QuotaUtil.isQuotaEnabled(getConfiguration()); 2402 long sizeToBeLoaded = -1; 2403 2404 // Check to see if this bulk load would exceed the space quota for this table 2405 if (spaceQuotaEnabled) { 2406 ActivePolicyEnforcement activeSpaceQuotas = getSpaceQuotaManager().getActiveEnforcements(); 2407 SpaceViolationPolicyEnforcement enforcement = 2408 activeSpaceQuotas.getPolicyEnforcement(region); 2409 if (enforcement != null) { 2410 // Bulk loads must still be atomic. We must enact all or none. 2411 List<String> filePaths = new ArrayList<>(request.getFamilyPathCount()); 2412 for (FamilyPath familyPath : request.getFamilyPathList()) { 2413 filePaths.add(familyPath.getPath()); 2414 } 2415 // Check if the batch of files exceeds the current quota 2416 sizeToBeLoaded = enforcement.computeBulkLoadSize(regionServer.getFileSystem(), filePaths); 2417 } 2418 } 2419 2420 List<Pair<byte[], String>> familyPaths = new ArrayList<>(request.getFamilyPathCount()); 2421 for (FamilyPath familyPath : request.getFamilyPathList()) { 2422 familyPaths.add(new Pair<>(familyPath.getFamily().toByteArray(), familyPath.getPath())); 2423 } 2424 if (!request.hasBulkToken()) { 2425 if (region.getCoprocessorHost() != null) { 2426 region.getCoprocessorHost().preBulkLoadHFile(familyPaths); 2427 } 2428 try { 2429 map = region.bulkLoadHFiles(familyPaths, request.getAssignSeqNum(), null, 2430 request.getCopyFile(), clusterIds, request.getReplicate()); 2431 } finally { 2432 if (region.getCoprocessorHost() != null) { 2433 region.getCoprocessorHost().postBulkLoadHFile(familyPaths, map); 2434 } 2435 } 2436 } else { 2437 // secure bulk load 2438 map = 2439 regionServer.getSecureBulkLoadManager().secureBulkLoadHFiles(region, request, clusterIds); 2440 } 2441 BulkLoadHFileResponse.Builder builder = BulkLoadHFileResponse.newBuilder(); 2442 builder.setLoaded(map != null); 2443 if (map != null) { 2444 // Treat any negative size as a flag to "ignore" updating the region size as that is 2445 // not possible to occur in real life (cannot bulk load a file with negative size) 2446 if (spaceQuotaEnabled && sizeToBeLoaded > 0) { 2447 if (LOG.isTraceEnabled()) { 2448 LOG.trace("Incrementing space use of " + region.getRegionInfo() + " by " 2449 + sizeToBeLoaded + " bytes"); 2450 } 2451 // Inform space quotas of the new files for this region 2452 getSpaceQuotaManager().getRegionSizeStore().incrementRegionSize(region.getRegionInfo(), 2453 sizeToBeLoaded); 2454 } 2455 } 2456 return builder.build(); 2457 } catch (IOException ie) { 2458 throw new ServiceException(ie); 2459 } finally { 2460 final MetricsRegionServer metricsRegionServer = regionServer.getMetrics(); 2461 if (metricsRegionServer != null) { 2462 metricsRegionServer.updateBulkLoad(EnvironmentEdgeManager.currentTime() - start); 2463 } 2464 } 2465 } 2466 2467 @Override 2468 public PrepareBulkLoadResponse prepareBulkLoad(RpcController controller, 2469 PrepareBulkLoadRequest request) throws ServiceException { 2470 try { 2471 checkOpen(); 2472 requestCount.increment(); 2473 2474 HRegion region = getRegion(request.getRegion()); 2475 2476 String bulkToken = regionServer.getSecureBulkLoadManager().prepareBulkLoad(region, request); 2477 PrepareBulkLoadResponse.Builder builder = PrepareBulkLoadResponse.newBuilder(); 2478 builder.setBulkToken(bulkToken); 2479 return builder.build(); 2480 } catch (IOException ie) { 2481 throw new ServiceException(ie); 2482 } 2483 } 2484 2485 @Override 2486 public CleanupBulkLoadResponse cleanupBulkLoad(RpcController controller, 2487 CleanupBulkLoadRequest request) throws ServiceException { 2488 try { 2489 checkOpen(); 2490 requestCount.increment(); 2491 2492 HRegion region = getRegion(request.getRegion()); 2493 2494 regionServer.getSecureBulkLoadManager().cleanupBulkLoad(region, request); 2495 return CleanupBulkLoadResponse.newBuilder().build(); 2496 } catch (IOException ie) { 2497 throw new ServiceException(ie); 2498 } 2499 } 2500 2501 @Override 2502 public CoprocessorServiceResponse execService(final RpcController controller, 2503 final CoprocessorServiceRequest request) throws ServiceException { 2504 try { 2505 checkOpen(); 2506 requestCount.increment(); 2507 HRegion region = getRegion(request.getRegion()); 2508 com.google.protobuf.Message result = execServiceOnRegion(region, request.getCall()); 2509 CoprocessorServiceResponse.Builder builder = CoprocessorServiceResponse.newBuilder(); 2510 builder.setRegion(RequestConverter.buildRegionSpecifier(RegionSpecifierType.REGION_NAME, 2511 region.getRegionInfo().getRegionName())); 2512 // TODO: COPIES!!!!!! 2513 builder.setValue(builder.getValueBuilder().setName(result.getClass().getName()).setValue( 2514 org.apache.hbase.thirdparty.com.google.protobuf.ByteString.copyFrom(result.toByteArray()))); 2515 return builder.build(); 2516 } catch (IOException ie) { 2517 throw new ServiceException(ie); 2518 } 2519 } 2520 2521 private com.google.protobuf.Message execServiceOnRegion(HRegion region, 2522 final ClientProtos.CoprocessorServiceCall serviceCall) throws IOException { 2523 // ignore the passed in controller (from the serialized call) 2524 ServerRpcController execController = new ServerRpcController(); 2525 return region.execService(execController, serviceCall); 2526 } 2527 2528 /** 2529 * Get data from a table. 2530 * @param controller the RPC controller 2531 * @param request the get request n 2532 */ 2533 @Override 2534 public GetResponse get(final RpcController controller, final GetRequest request) 2535 throws ServiceException { 2536 long before = EnvironmentEdgeManager.currentTime(); 2537 OperationQuota quota = null; 2538 HRegion region = null; 2539 try { 2540 checkOpen(); 2541 requestCount.increment(); 2542 rpcGetRequestCount.increment(); 2543 region = getRegion(request.getRegion()); 2544 2545 GetResponse.Builder builder = GetResponse.newBuilder(); 2546 ClientProtos.Get get = request.getGet(); 2547 // An asynchbase client, https://github.com/OpenTSDB/asynchbase, starts by trying to do 2548 // a get closest before. Throwing the UnknownProtocolException signals it that it needs 2549 // to switch and do hbase2 protocol (HBase servers do not tell clients what versions 2550 // they are; its a problem for non-native clients like asynchbase. HBASE-20225. 2551 if (get.hasClosestRowBefore() && get.getClosestRowBefore()) { 2552 throw new UnknownProtocolException("Is this a pre-hbase-1.0.0 or asynchbase client? " 2553 + "Client is invoking getClosestRowBefore removed in hbase-2.0.0 replaced by " 2554 + "reverse Scan."); 2555 } 2556 Boolean existence = null; 2557 Result r = null; 2558 RpcCallContext context = RpcServer.getCurrentCall().orElse(null); 2559 quota = getRpcQuotaManager().checkQuota(region, OperationQuota.OperationType.GET); 2560 2561 Get clientGet = ProtobufUtil.toGet(get); 2562 if (get.getExistenceOnly() && region.getCoprocessorHost() != null) { 2563 existence = region.getCoprocessorHost().preExists(clientGet); 2564 } 2565 if (existence == null) { 2566 if (context != null) { 2567 r = get(clientGet, (region), null, context); 2568 } else { 2569 // for test purpose 2570 r = region.get(clientGet); 2571 } 2572 if (get.getExistenceOnly()) { 2573 boolean exists = r.getExists(); 2574 if (region.getCoprocessorHost() != null) { 2575 exists = region.getCoprocessorHost().postExists(clientGet, exists); 2576 } 2577 existence = exists; 2578 } 2579 } 2580 if (existence != null) { 2581 ClientProtos.Result pbr = 2582 ProtobufUtil.toResult(existence, region.getRegionInfo().getReplicaId() != 0); 2583 builder.setResult(pbr); 2584 } else if (r != null) { 2585 ClientProtos.Result pbr; 2586 if ( 2587 isClientCellBlockSupport(context) && controller instanceof HBaseRpcController 2588 && VersionInfoUtil.hasMinimumVersion(context.getClientVersionInfo(), 1, 3) 2589 ) { 2590 pbr = ProtobufUtil.toResultNoData(r); 2591 ((HBaseRpcController) controller) 2592 .setCellScanner(CellUtil.createCellScanner(r.rawCells())); 2593 addSize(context, r, null); 2594 } else { 2595 pbr = ProtobufUtil.toResult(r); 2596 } 2597 builder.setResult(pbr); 2598 } 2599 // r.cells is null when an table.exists(get) call 2600 if (r != null && r.rawCells() != null) { 2601 quota.addGetResult(r); 2602 } 2603 return builder.build(); 2604 } catch (IOException ie) { 2605 throw new ServiceException(ie); 2606 } finally { 2607 final MetricsRegionServer metricsRegionServer = regionServer.getMetrics(); 2608 if (metricsRegionServer != null) { 2609 TableDescriptor td = region != null ? region.getTableDescriptor() : null; 2610 if (td != null) { 2611 metricsRegionServer.updateGet(td.getTableName(), 2612 EnvironmentEdgeManager.currentTime() - before); 2613 } 2614 } 2615 if (quota != null) { 2616 quota.close(); 2617 } 2618 } 2619 } 2620 2621 private Result get(Get get, HRegion region, RegionScannersCloseCallBack closeCallBack, 2622 RpcCallContext context) throws IOException { 2623 region.prepareGet(get); 2624 boolean stale = region.getRegionInfo().getReplicaId() != 0; 2625 2626 // This method is almost the same as HRegion#get. 2627 List<Cell> results = new ArrayList<>(); 2628 long before = EnvironmentEdgeManager.currentTime(); 2629 // pre-get CP hook 2630 if (region.getCoprocessorHost() != null) { 2631 if (region.getCoprocessorHost().preGet(get, results)) { 2632 region.metricsUpdateForGet(results, before); 2633 return Result.create(results, get.isCheckExistenceOnly() ? !results.isEmpty() : null, 2634 stale); 2635 } 2636 } 2637 Scan scan = new Scan(get); 2638 if (scan.getLoadColumnFamiliesOnDemandValue() == null) { 2639 scan.setLoadColumnFamiliesOnDemand(region.isLoadingCfsOnDemandDefault()); 2640 } 2641 RegionScannerImpl scanner = null; 2642 try { 2643 scanner = region.getScanner(scan); 2644 scanner.next(results); 2645 } finally { 2646 if (scanner != null) { 2647 if (closeCallBack == null) { 2648 // If there is a context then the scanner can be added to the current 2649 // RpcCallContext. The rpc callback will take care of closing the 2650 // scanner, for eg in case 2651 // of get() 2652 context.setCallBack(scanner); 2653 } else { 2654 // The call is from multi() where the results from the get() are 2655 // aggregated and then send out to the 2656 // rpc. The rpccall back will close all such scanners created as part 2657 // of multi(). 2658 closeCallBack.addScanner(scanner); 2659 } 2660 } 2661 } 2662 2663 // post-get CP hook 2664 if (region.getCoprocessorHost() != null) { 2665 region.getCoprocessorHost().postGet(get, results); 2666 } 2667 region.metricsUpdateForGet(results, before); 2668 2669 return Result.create(results, get.isCheckExistenceOnly() ? !results.isEmpty() : null, stale); 2670 } 2671 2672 private void checkBatchSizeAndLogLargeSize(MultiRequest request) throws ServiceException { 2673 int sum = 0; 2674 String firstRegionName = null; 2675 for (RegionAction regionAction : request.getRegionActionList()) { 2676 if (sum == 0) { 2677 firstRegionName = Bytes.toStringBinary(regionAction.getRegion().getValue().toByteArray()); 2678 } 2679 sum += regionAction.getActionCount(); 2680 } 2681 if (sum > rowSizeWarnThreshold) { 2682 LOG.warn("Large batch operation detected (greater than " + rowSizeWarnThreshold 2683 + ") (HBASE-18023)." + " Requested Number of Rows: " + sum + " Client: " 2684 + RpcServer.getRequestUserName().orElse(null) + "/" 2685 + RpcServer.getRemoteAddress().orElse(null) + " first region in multi=" + firstRegionName); 2686 if (rejectRowsWithSizeOverThreshold) { 2687 throw new ServiceException( 2688 "Rejecting large batch operation for current batch with firstRegionName: " 2689 + firstRegionName + " , Requested Number of Rows: " + sum + " , Size Threshold: " 2690 + rowSizeWarnThreshold); 2691 } 2692 } 2693 } 2694 2695 private void failRegionAction(MultiResponse.Builder responseBuilder, 2696 RegionActionResult.Builder regionActionResultBuilder, RegionAction regionAction, 2697 CellScanner cellScanner, Throwable error) { 2698 rpcServer.getMetrics().exception(error); 2699 regionActionResultBuilder.setException(ResponseConverter.buildException(error)); 2700 responseBuilder.addRegionActionResult(regionActionResultBuilder.build()); 2701 // All Mutations in this RegionAction not executed as we can not see the Region online here 2702 // in this RS. Will be retried from Client. Skipping all the Cells in CellScanner 2703 // corresponding to these Mutations. 2704 if (cellScanner != null) { 2705 skipCellsForMutations(regionAction.getActionList(), cellScanner); 2706 } 2707 } 2708 2709 /** 2710 * Execute multiple actions on a table: get, mutate, and/or execCoprocessor 2711 * @param rpcc the RPC controller 2712 * @param request the multi request n 2713 */ 2714 @Override 2715 public MultiResponse multi(final RpcController rpcc, final MultiRequest request) 2716 throws ServiceException { 2717 try { 2718 checkOpen(); 2719 } catch (IOException ie) { 2720 throw new ServiceException(ie); 2721 } 2722 2723 checkBatchSizeAndLogLargeSize(request); 2724 2725 // rpc controller is how we bring in data via the back door; it is unprotobuf'ed data. 2726 // It is also the conduit via which we pass back data. 2727 HBaseRpcController controller = (HBaseRpcController) rpcc; 2728 CellScanner cellScanner = controller != null ? controller.cellScanner() : null; 2729 if (controller != null) { 2730 controller.setCellScanner(null); 2731 } 2732 2733 long nonceGroup = request.hasNonceGroup() ? request.getNonceGroup() : HConstants.NO_NONCE; 2734 2735 MultiResponse.Builder responseBuilder = MultiResponse.newBuilder(); 2736 RegionActionResult.Builder regionActionResultBuilder = RegionActionResult.newBuilder(); 2737 this.rpcMultiRequestCount.increment(); 2738 this.requestCount.increment(); 2739 ActivePolicyEnforcement spaceQuotaEnforcement = getSpaceQuotaManager().getActiveEnforcements(); 2740 2741 // We no longer use MultiRequest#condition. Instead, we use RegionAction#condition. The 2742 // following logic is for backward compatibility as old clients still use 2743 // MultiRequest#condition in case of checkAndMutate with RowMutations. 2744 if (request.hasCondition()) { 2745 if (request.getRegionActionList().isEmpty()) { 2746 // If the region action list is empty, do nothing. 2747 responseBuilder.setProcessed(true); 2748 return responseBuilder.build(); 2749 } 2750 2751 RegionAction regionAction = request.getRegionAction(0); 2752 2753 // When request.hasCondition() is true, regionAction.getAtomic() should be always true. So 2754 // we can assume regionAction.getAtomic() is true here. 2755 assert regionAction.getAtomic(); 2756 2757 OperationQuota quota; 2758 HRegion region; 2759 RegionSpecifier regionSpecifier = regionAction.getRegion(); 2760 2761 try { 2762 region = getRegion(regionSpecifier); 2763 quota = getRpcQuotaManager().checkQuota(region, regionAction.getActionList()); 2764 } catch (IOException e) { 2765 failRegionAction(responseBuilder, regionActionResultBuilder, regionAction, cellScanner, e); 2766 return responseBuilder.build(); 2767 } 2768 2769 try { 2770 CheckAndMutateResult result = checkAndMutate(region, regionAction.getActionList(), 2771 cellScanner, request.getCondition(), nonceGroup, spaceQuotaEnforcement); 2772 responseBuilder.setProcessed(result.isSuccess()); 2773 ClientProtos.ResultOrException.Builder resultOrExceptionOrBuilder = 2774 ClientProtos.ResultOrException.newBuilder(); 2775 for (int i = 0; i < regionAction.getActionCount(); i++) { 2776 // To unify the response format with doNonAtomicRegionMutation and read through 2777 // client's AsyncProcess we have to add an empty result instance per operation 2778 resultOrExceptionOrBuilder.clear(); 2779 resultOrExceptionOrBuilder.setIndex(i); 2780 regionActionResultBuilder.addResultOrException(resultOrExceptionOrBuilder.build()); 2781 } 2782 } catch (IOException e) { 2783 rpcServer.getMetrics().exception(e); 2784 // As it's an atomic operation with a condition, we may expect it's a global failure. 2785 regionActionResultBuilder.setException(ResponseConverter.buildException(e)); 2786 } finally { 2787 quota.close(); 2788 } 2789 2790 responseBuilder.addRegionActionResult(regionActionResultBuilder.build()); 2791 ClientProtos.RegionLoadStats regionLoadStats = region.getLoadStatistics(); 2792 if (regionLoadStats != null) { 2793 responseBuilder.setRegionStatistics(MultiRegionLoadStats.newBuilder() 2794 .addRegion(regionSpecifier).addStat(regionLoadStats).build()); 2795 } 2796 return responseBuilder.build(); 2797 } 2798 2799 // this will contain all the cells that we need to return. It's created later, if needed. 2800 List<CellScannable> cellsToReturn = null; 2801 RegionScannersCloseCallBack closeCallBack = null; 2802 RpcCallContext context = RpcServer.getCurrentCall().orElse(null); 2803 Map<RegionSpecifier, ClientProtos.RegionLoadStats> regionStats = 2804 new HashMap<>(request.getRegionActionCount()); 2805 2806 for (RegionAction regionAction : request.getRegionActionList()) { 2807 OperationQuota quota; 2808 HRegion region; 2809 RegionSpecifier regionSpecifier = regionAction.getRegion(); 2810 regionActionResultBuilder.clear(); 2811 2812 try { 2813 region = getRegion(regionSpecifier); 2814 quota = getRpcQuotaManager().checkQuota(region, regionAction.getActionList()); 2815 } catch (IOException e) { 2816 failRegionAction(responseBuilder, regionActionResultBuilder, regionAction, cellScanner, e); 2817 continue; // For this region it's a failure. 2818 } 2819 2820 try { 2821 if (regionAction.hasCondition()) { 2822 try { 2823 ClientProtos.ResultOrException.Builder resultOrExceptionOrBuilder = 2824 ClientProtos.ResultOrException.newBuilder(); 2825 if (regionAction.getActionCount() == 1) { 2826 CheckAndMutateResult result = 2827 checkAndMutate(region, quota, regionAction.getAction(0).getMutation(), cellScanner, 2828 regionAction.getCondition(), nonceGroup, spaceQuotaEnforcement); 2829 regionActionResultBuilder.setProcessed(result.isSuccess()); 2830 resultOrExceptionOrBuilder.setIndex(0); 2831 if (result.getResult() != null) { 2832 resultOrExceptionOrBuilder.setResult(ProtobufUtil.toResult(result.getResult())); 2833 } 2834 regionActionResultBuilder.addResultOrException(resultOrExceptionOrBuilder.build()); 2835 } else { 2836 CheckAndMutateResult result = checkAndMutate(region, regionAction.getActionList(), 2837 cellScanner, regionAction.getCondition(), nonceGroup, spaceQuotaEnforcement); 2838 regionActionResultBuilder.setProcessed(result.isSuccess()); 2839 for (int i = 0; i < regionAction.getActionCount(); i++) { 2840 if (i == 0 && result.getResult() != null) { 2841 // Set the result of the Increment/Append operations to the first element of the 2842 // ResultOrException list 2843 resultOrExceptionOrBuilder.setIndex(i); 2844 regionActionResultBuilder.addResultOrException(resultOrExceptionOrBuilder 2845 .setResult(ProtobufUtil.toResult(result.getResult())).build()); 2846 continue; 2847 } 2848 // To unify the response format with doNonAtomicRegionMutation and read through 2849 // client's AsyncProcess we have to add an empty result instance per operation 2850 resultOrExceptionOrBuilder.clear(); 2851 resultOrExceptionOrBuilder.setIndex(i); 2852 regionActionResultBuilder.addResultOrException(resultOrExceptionOrBuilder.build()); 2853 } 2854 } 2855 } catch (IOException e) { 2856 rpcServer.getMetrics().exception(e); 2857 // As it's an atomic operation with a condition, we may expect it's a global failure. 2858 regionActionResultBuilder.setException(ResponseConverter.buildException(e)); 2859 } 2860 } else if (regionAction.hasAtomic() && regionAction.getAtomic()) { 2861 try { 2862 doAtomicBatchOp(regionActionResultBuilder, region, quota, regionAction.getActionList(), 2863 cellScanner, nonceGroup, spaceQuotaEnforcement); 2864 regionActionResultBuilder.setProcessed(true); 2865 // We no longer use MultiResponse#processed. Instead, we use 2866 // RegionActionResult#processed. This is for backward compatibility for old clients. 2867 responseBuilder.setProcessed(true); 2868 } catch (IOException e) { 2869 rpcServer.getMetrics().exception(e); 2870 // As it's atomic, we may expect it's a global failure. 2871 regionActionResultBuilder.setException(ResponseConverter.buildException(e)); 2872 } 2873 } else { 2874 // doNonAtomicRegionMutation manages the exception internally 2875 if (context != null && closeCallBack == null) { 2876 // An RpcCallBack that creates a list of scanners that needs to perform callBack 2877 // operation on completion of multiGets. 2878 // Set this only once 2879 closeCallBack = new RegionScannersCloseCallBack(); 2880 context.setCallBack(closeCallBack); 2881 } 2882 cellsToReturn = doNonAtomicRegionMutation(region, quota, regionAction, cellScanner, 2883 regionActionResultBuilder, cellsToReturn, nonceGroup, closeCallBack, context, 2884 spaceQuotaEnforcement); 2885 } 2886 } finally { 2887 quota.close(); 2888 } 2889 2890 responseBuilder.addRegionActionResult(regionActionResultBuilder.build()); 2891 ClientProtos.RegionLoadStats regionLoadStats = region.getLoadStatistics(); 2892 if (regionLoadStats != null) { 2893 regionStats.put(regionSpecifier, regionLoadStats); 2894 } 2895 } 2896 // Load the controller with the Cells to return. 2897 if (cellsToReturn != null && !cellsToReturn.isEmpty() && controller != null) { 2898 controller.setCellScanner(CellUtil.createCellScanner(cellsToReturn)); 2899 } 2900 2901 MultiRegionLoadStats.Builder builder = MultiRegionLoadStats.newBuilder(); 2902 for (Entry<RegionSpecifier, ClientProtos.RegionLoadStats> stat : regionStats.entrySet()) { 2903 builder.addRegion(stat.getKey()); 2904 builder.addStat(stat.getValue()); 2905 } 2906 responseBuilder.setRegionStatistics(builder); 2907 return responseBuilder.build(); 2908 } 2909 2910 private void skipCellsForMutations(List<Action> actions, CellScanner cellScanner) { 2911 if (cellScanner == null) { 2912 return; 2913 } 2914 for (Action action : actions) { 2915 skipCellsForMutation(action, cellScanner); 2916 } 2917 } 2918 2919 private void skipCellsForMutation(Action action, CellScanner cellScanner) { 2920 if (cellScanner == null) { 2921 return; 2922 } 2923 try { 2924 if (action.hasMutation()) { 2925 MutationProto m = action.getMutation(); 2926 if (m.hasAssociatedCellCount()) { 2927 for (int i = 0; i < m.getAssociatedCellCount(); i++) { 2928 cellScanner.advance(); 2929 } 2930 } 2931 } 2932 } catch (IOException e) { 2933 // No need to handle these Individual Muatation level issue. Any way this entire RegionAction 2934 // marked as failed as we could not see the Region here. At client side the top level 2935 // RegionAction exception will be considered first. 2936 LOG.error("Error while skipping Cells in CellScanner for invalid Region Mutations", e); 2937 } 2938 } 2939 2940 /** 2941 * Mutate data in a table. 2942 * @param rpcc the RPC controller 2943 * @param request the mutate request 2944 */ 2945 @Override 2946 public MutateResponse mutate(final RpcController rpcc, final MutateRequest request) 2947 throws ServiceException { 2948 // rpc controller is how we bring in data via the back door; it is unprotobuf'ed data. 2949 // It is also the conduit via which we pass back data. 2950 HBaseRpcController controller = (HBaseRpcController) rpcc; 2951 CellScanner cellScanner = controller != null ? controller.cellScanner() : null; 2952 OperationQuota quota = null; 2953 RpcCallContext context = RpcServer.getCurrentCall().orElse(null); 2954 // Clear scanner so we are not holding on to reference across call. 2955 if (controller != null) { 2956 controller.setCellScanner(null); 2957 } 2958 try { 2959 checkOpen(); 2960 requestCount.increment(); 2961 rpcMutateRequestCount.increment(); 2962 HRegion region = getRegion(request.getRegion()); 2963 MutateResponse.Builder builder = MutateResponse.newBuilder(); 2964 MutationProto mutation = request.getMutation(); 2965 if (!region.getRegionInfo().isMetaRegion()) { 2966 regionServer.getMemStoreFlusher().reclaimMemStoreMemory(); 2967 } 2968 long nonceGroup = request.hasNonceGroup() ? request.getNonceGroup() : HConstants.NO_NONCE; 2969 quota = getRpcQuotaManager().checkQuota(region, OperationQuota.OperationType.MUTATE); 2970 ActivePolicyEnforcement spaceQuotaEnforcement = 2971 getSpaceQuotaManager().getActiveEnforcements(); 2972 2973 if (request.hasCondition()) { 2974 CheckAndMutateResult result = checkAndMutate(region, quota, mutation, cellScanner, 2975 request.getCondition(), nonceGroup, spaceQuotaEnforcement); 2976 builder.setProcessed(result.isSuccess()); 2977 boolean clientCellBlockSupported = isClientCellBlockSupport(context); 2978 addResult(builder, result.getResult(), controller, clientCellBlockSupported); 2979 if (clientCellBlockSupported) { 2980 addSize(context, result.getResult(), null); 2981 } 2982 } else { 2983 Result r = null; 2984 Boolean processed = null; 2985 MutationType type = mutation.getMutateType(); 2986 switch (type) { 2987 case APPEND: 2988 // TODO: this doesn't actually check anything. 2989 r = append(region, quota, mutation, cellScanner, nonceGroup, spaceQuotaEnforcement); 2990 break; 2991 case INCREMENT: 2992 // TODO: this doesn't actually check anything. 2993 r = increment(region, quota, mutation, cellScanner, nonceGroup, spaceQuotaEnforcement); 2994 break; 2995 case PUT: 2996 put(region, quota, mutation, cellScanner, spaceQuotaEnforcement); 2997 processed = Boolean.TRUE; 2998 break; 2999 case DELETE: 3000 delete(region, quota, mutation, cellScanner, spaceQuotaEnforcement); 3001 processed = Boolean.TRUE; 3002 break; 3003 default: 3004 throw new DoNotRetryIOException("Unsupported mutate type: " + type.name()); 3005 } 3006 if (processed != null) { 3007 builder.setProcessed(processed); 3008 } 3009 boolean clientCellBlockSupported = isClientCellBlockSupport(context); 3010 addResult(builder, r, controller, clientCellBlockSupported); 3011 if (clientCellBlockSupported) { 3012 addSize(context, r, null); 3013 } 3014 } 3015 return builder.build(); 3016 } catch (IOException ie) { 3017 regionServer.checkFileSystem(); 3018 throw new ServiceException(ie); 3019 } finally { 3020 if (quota != null) { 3021 quota.close(); 3022 } 3023 } 3024 } 3025 3026 private void put(HRegion region, OperationQuota quota, MutationProto mutation, 3027 CellScanner cellScanner, ActivePolicyEnforcement spaceQuota) throws IOException { 3028 long before = EnvironmentEdgeManager.currentTime(); 3029 Put put = ProtobufUtil.toPut(mutation, cellScanner); 3030 checkCellSizeLimit(region, put); 3031 spaceQuota.getPolicyEnforcement(region).check(put); 3032 quota.addMutation(put); 3033 region.put(put); 3034 3035 MetricsRegionServer metricsRegionServer = regionServer.getMetrics(); 3036 if (metricsRegionServer != null) { 3037 long after = EnvironmentEdgeManager.currentTime(); 3038 metricsRegionServer.updatePut(region.getRegionInfo().getTable(), after - before); 3039 } 3040 } 3041 3042 private void delete(HRegion region, OperationQuota quota, MutationProto mutation, 3043 CellScanner cellScanner, ActivePolicyEnforcement spaceQuota) throws IOException { 3044 long before = EnvironmentEdgeManager.currentTime(); 3045 Delete delete = ProtobufUtil.toDelete(mutation, cellScanner); 3046 checkCellSizeLimit(region, delete); 3047 spaceQuota.getPolicyEnforcement(region).check(delete); 3048 quota.addMutation(delete); 3049 region.delete(delete); 3050 3051 MetricsRegionServer metricsRegionServer = regionServer.getMetrics(); 3052 if (metricsRegionServer != null) { 3053 long after = EnvironmentEdgeManager.currentTime(); 3054 metricsRegionServer.updateDelete(region.getRegionInfo().getTable(), after - before); 3055 } 3056 } 3057 3058 private CheckAndMutateResult checkAndMutate(HRegion region, OperationQuota quota, 3059 MutationProto mutation, CellScanner cellScanner, Condition condition, long nonceGroup, 3060 ActivePolicyEnforcement spaceQuota) throws IOException { 3061 long before = EnvironmentEdgeManager.currentTime(); 3062 CheckAndMutate checkAndMutate = ProtobufUtil.toCheckAndMutate(condition, mutation, cellScanner); 3063 long nonce = mutation.hasNonce() ? mutation.getNonce() : HConstants.NO_NONCE; 3064 checkCellSizeLimit(region, (Mutation) checkAndMutate.getAction()); 3065 spaceQuota.getPolicyEnforcement(region).check((Mutation) checkAndMutate.getAction()); 3066 quota.addMutation((Mutation) checkAndMutate.getAction()); 3067 3068 CheckAndMutateResult result = null; 3069 if (region.getCoprocessorHost() != null) { 3070 result = region.getCoprocessorHost().preCheckAndMutate(checkAndMutate); 3071 } 3072 if (result == null) { 3073 result = region.checkAndMutate(checkAndMutate, nonceGroup, nonce); 3074 if (region.getCoprocessorHost() != null) { 3075 result = region.getCoprocessorHost().postCheckAndMutate(checkAndMutate, result); 3076 } 3077 } 3078 MetricsRegionServer metricsRegionServer = regionServer.getMetrics(); 3079 if (metricsRegionServer != null) { 3080 long after = EnvironmentEdgeManager.currentTime(); 3081 metricsRegionServer.updateCheckAndMutate(region.getRegionInfo().getTable(), after - before); 3082 3083 MutationType type = mutation.getMutateType(); 3084 switch (type) { 3085 case PUT: 3086 metricsRegionServer.updateCheckAndPut(region.getRegionInfo().getTable(), after - before); 3087 break; 3088 case DELETE: 3089 metricsRegionServer.updateCheckAndDelete(region.getRegionInfo().getTable(), 3090 after - before); 3091 break; 3092 default: 3093 break; 3094 } 3095 } 3096 return result; 3097 } 3098 3099 // This is used to keep compatible with the old client implementation. Consider remove it if we 3100 // decide to drop the support of the client that still sends close request to a region scanner 3101 // which has already been exhausted. 3102 @Deprecated 3103 private static final IOException SCANNER_ALREADY_CLOSED = new IOException() { 3104 3105 private static final long serialVersionUID = -4305297078988180130L; 3106 3107 @Override 3108 public synchronized Throwable fillInStackTrace() { 3109 return this; 3110 } 3111 }; 3112 3113 private RegionScannerHolder getRegionScanner(ScanRequest request) throws IOException { 3114 String scannerName = toScannerName(request.getScannerId()); 3115 RegionScannerHolder rsh = this.scanners.get(scannerName); 3116 if (rsh == null) { 3117 // just ignore the next or close request if scanner does not exists. 3118 if (closedScanners.getIfPresent(scannerName) != null) { 3119 throw SCANNER_ALREADY_CLOSED; 3120 } else { 3121 LOG.warn("Client tried to access missing scanner " + scannerName); 3122 throw new UnknownScannerException( 3123 "Unknown scanner '" + scannerName + "'. This can happen due to any of the following " 3124 + "reasons: a) Scanner id given is wrong, b) Scanner lease expired because of " 3125 + "long wait between consecutive client checkins, c) Server may be closing down, " 3126 + "d) RegionServer restart during upgrade.\nIf the issue is due to reason (b), a " 3127 + "possible fix would be increasing the value of" 3128 + "'hbase.client.scanner.timeout.period' configuration."); 3129 } 3130 } 3131 RegionInfo hri = rsh.s.getRegionInfo(); 3132 // Yes, should be the same instance 3133 if (regionServer.getOnlineRegion(hri.getRegionName()) != rsh.r) { 3134 String msg = "Region has changed on the scanner " + scannerName + ": regionName=" 3135 + hri.getRegionNameAsString() + ", scannerRegionName=" + rsh.r; 3136 LOG.warn(msg + ", closing..."); 3137 scanners.remove(scannerName); 3138 try { 3139 rsh.s.close(); 3140 } catch (IOException e) { 3141 LOG.warn("Getting exception closing " + scannerName, e); 3142 } finally { 3143 try { 3144 regionServer.getLeaseManager().cancelLease(scannerName); 3145 } catch (LeaseException e) { 3146 LOG.warn("Getting exception closing " + scannerName, e); 3147 } 3148 } 3149 throw new NotServingRegionException(msg); 3150 } 3151 return rsh; 3152 } 3153 3154 /** 3155 * @return Pair with scannerName key to use with this new Scanner and its RegionScannerHolder 3156 * value. 3157 */ 3158 private Pair<String, RegionScannerHolder> newRegionScanner(ScanRequest request, 3159 ScanResponse.Builder builder) throws IOException { 3160 HRegion region = getRegion(request.getRegion()); 3161 ClientProtos.Scan protoScan = request.getScan(); 3162 boolean isLoadingCfsOnDemandSet = protoScan.hasLoadColumnFamiliesOnDemand(); 3163 Scan scan = ProtobufUtil.toScan(protoScan); 3164 // if the request doesn't set this, get the default region setting. 3165 if (!isLoadingCfsOnDemandSet) { 3166 scan.setLoadColumnFamiliesOnDemand(region.isLoadingCfsOnDemandDefault()); 3167 } 3168 3169 if (!scan.hasFamilies()) { 3170 // Adding all families to scanner 3171 for (byte[] family : region.getTableDescriptor().getColumnFamilyNames()) { 3172 scan.addFamily(family); 3173 } 3174 } 3175 if (region.getCoprocessorHost() != null) { 3176 // preScannerOpen is not allowed to return a RegionScanner. Only post hook can create a 3177 // wrapper for the core created RegionScanner 3178 region.getCoprocessorHost().preScannerOpen(scan); 3179 } 3180 RegionScannerImpl coreScanner = region.getScanner(scan); 3181 Shipper shipper = coreScanner; 3182 RegionScanner scanner = coreScanner; 3183 try { 3184 if (region.getCoprocessorHost() != null) { 3185 scanner = region.getCoprocessorHost().postScannerOpen(scan, scanner); 3186 } 3187 } catch (Exception e) { 3188 // Although region coprocessor is for advanced users and they should take care of the 3189 // implementation to not damage the HBase system, closing the scanner on exception here does 3190 // not have any bad side effect, so let's do it 3191 scanner.close(); 3192 throw e; 3193 } 3194 long scannerId = scannerIdGenerator.generateNewScannerId(); 3195 builder.setScannerId(scannerId); 3196 builder.setMvccReadPoint(scanner.getMvccReadPoint()); 3197 builder.setTtl(scannerLeaseTimeoutPeriod); 3198 String scannerName = toScannerName(scannerId); 3199 3200 boolean fullRegionScan = 3201 !region.getRegionInfo().getTable().isSystemTable() && isFullRegionScan(scan, region); 3202 3203 return new Pair<String, RegionScannerHolder>(scannerName, 3204 addScanner(scannerName, scanner, shipper, region, scan.isNeedCursorResult(), fullRegionScan)); 3205 } 3206 3207 /** 3208 * The returned String is used as key doing look up of outstanding Scanners in this Servers' 3209 * this.scanners, the Map of outstanding scanners and their current state. 3210 * @param scannerId A scanner long id. 3211 * @return The long id as a String. 3212 */ 3213 private static String toScannerName(long scannerId) { 3214 return Long.toString(scannerId); 3215 } 3216 3217 private void checkScanNextCallSeq(ScanRequest request, RegionScannerHolder rsh) 3218 throws OutOfOrderScannerNextException { 3219 // if nextCallSeq does not match throw Exception straight away. This needs to be 3220 // performed even before checking of Lease. 3221 // See HBASE-5974 3222 if (request.hasNextCallSeq()) { 3223 long callSeq = request.getNextCallSeq(); 3224 if (!rsh.incNextCallSeq(callSeq)) { 3225 throw new OutOfOrderScannerNextException( 3226 "Expected nextCallSeq: " + rsh.getNextCallSeq() + " But the nextCallSeq got from client: " 3227 + request.getNextCallSeq() + "; request=" + TextFormat.shortDebugString(request)); 3228 } 3229 } 3230 } 3231 3232 private void addScannerLeaseBack(LeaseManager.Lease lease) { 3233 try { 3234 regionServer.getLeaseManager().addLease(lease); 3235 } catch (LeaseStillHeldException e) { 3236 // should not happen as the scanner id is unique. 3237 throw new AssertionError(e); 3238 } 3239 } 3240 3241 // visible for testing only 3242 long getTimeLimit(RpcCall rpcCall, HBaseRpcController controller, 3243 boolean allowHeartbeatMessages) { 3244 // Set the time limit to be half of the more restrictive timeout value (one of the 3245 // timeout values must be positive). In the event that both values are positive, the 3246 // more restrictive of the two is used to calculate the limit. 3247 if (allowHeartbeatMessages) { 3248 long now = EnvironmentEdgeManager.currentTime(); 3249 long remainingTimeout = getRemainingRpcTimeout(rpcCall, controller, now); 3250 if (scannerLeaseTimeoutPeriod > 0 || remainingTimeout > 0) { 3251 long timeLimitDelta; 3252 if (scannerLeaseTimeoutPeriod > 0 && remainingTimeout > 0) { 3253 timeLimitDelta = Math.min(scannerLeaseTimeoutPeriod, remainingTimeout); 3254 } else { 3255 timeLimitDelta = 3256 scannerLeaseTimeoutPeriod > 0 ? scannerLeaseTimeoutPeriod : remainingTimeout; 3257 } 3258 3259 // Use half of whichever timeout value was more restrictive... But don't allow 3260 // the time limit to be less than the allowable minimum (could cause an 3261 // immediate timeout before scanning any data). 3262 timeLimitDelta = Math.max(timeLimitDelta / 2, minimumScanTimeLimitDelta); 3263 return now + timeLimitDelta; 3264 } 3265 } 3266 // Default value of timeLimit is negative to indicate no timeLimit should be 3267 // enforced. 3268 return -1L; 3269 } 3270 3271 private long getRemainingRpcTimeout(RpcCall call, HBaseRpcController controller, long now) { 3272 long timeout; 3273 if (controller != null && controller.getCallTimeout() > 0) { 3274 timeout = controller.getCallTimeout(); 3275 } else if (rpcTimeout > 0) { 3276 timeout = rpcTimeout; 3277 } else { 3278 return -1; 3279 } 3280 if (call != null) { 3281 timeout -= (now - call.getReceiveTime()); 3282 } 3283 // getTimeLimit ignores values <= 0, but timeout may now be negative if queue time was high. 3284 // return minimum value here in that case so we count this in calculating the final delta. 3285 return Math.max(minimumScanTimeLimitDelta, timeout); 3286 } 3287 3288 private void checkLimitOfRows(int numOfCompleteRows, int limitOfRows, boolean moreRows, 3289 ScannerContext scannerContext, ScanResponse.Builder builder) { 3290 if (numOfCompleteRows >= limitOfRows) { 3291 if (LOG.isTraceEnabled()) { 3292 LOG.trace("Done scanning, limit of rows reached, moreRows: " + moreRows 3293 + " scannerContext: " + scannerContext); 3294 } 3295 builder.setMoreResults(false); 3296 } 3297 } 3298 3299 // return whether we have more results in region. 3300 private void scan(HBaseRpcController controller, ScanRequest request, RegionScannerHolder rsh, 3301 long maxQuotaResultSize, int maxResults, int limitOfRows, List<Result> results, 3302 ScanResponse.Builder builder, MutableObject<Object> lastBlock, RpcCall rpcCall) 3303 throws IOException { 3304 HRegion region = rsh.r; 3305 RegionScanner scanner = rsh.s; 3306 long maxResultSize; 3307 if (scanner.getMaxResultSize() > 0) { 3308 maxResultSize = Math.min(scanner.getMaxResultSize(), maxQuotaResultSize); 3309 } else { 3310 maxResultSize = maxQuotaResultSize; 3311 } 3312 // This is cells inside a row. Default size is 10 so if many versions or many cfs, 3313 // then we'll resize. Resizings show in profiler. Set it higher than 10. For now 3314 // arbitrary 32. TODO: keep record of general size of results being returned. 3315 ArrayList<Cell> values = new ArrayList<>(32); 3316 region.startRegionOperation(Operation.SCAN); 3317 long before = EnvironmentEdgeManager.currentTime(); 3318 // Used to check if we've matched the row limit set on the Scan 3319 int numOfCompleteRows = 0; 3320 // Count of times we call nextRaw; can be > numOfCompleteRows. 3321 int numOfNextRawCalls = 0; 3322 try { 3323 int numOfResults = 0; 3324 synchronized (scanner) { 3325 boolean stale = (region.getRegionInfo().getReplicaId() != 0); 3326 boolean clientHandlesPartials = 3327 request.hasClientHandlesPartials() && request.getClientHandlesPartials(); 3328 boolean clientHandlesHeartbeats = 3329 request.hasClientHandlesHeartbeats() && request.getClientHandlesHeartbeats(); 3330 3331 // On the server side we must ensure that the correct ordering of partial results is 3332 // returned to the client to allow them to properly reconstruct the partial results. 3333 // If the coprocessor host is adding to the result list, we cannot guarantee the 3334 // correct ordering of partial results and so we prevent partial results from being 3335 // formed. 3336 boolean serverGuaranteesOrderOfPartials = results.isEmpty(); 3337 boolean allowPartialResults = clientHandlesPartials && serverGuaranteesOrderOfPartials; 3338 boolean moreRows = false; 3339 3340 // Heartbeat messages occur when the processing of the ScanRequest is exceeds a 3341 // certain time threshold on the server. When the time threshold is exceeded, the 3342 // server stops the scan and sends back whatever Results it has accumulated within 3343 // that time period (may be empty). Since heartbeat messages have the potential to 3344 // create partial Results (in the event that the timeout occurs in the middle of a 3345 // row), we must only generate heartbeat messages when the client can handle both 3346 // heartbeats AND partials 3347 boolean allowHeartbeatMessages = clientHandlesHeartbeats && allowPartialResults; 3348 3349 long timeLimit = getTimeLimit(rpcCall, controller, allowHeartbeatMessages); 3350 3351 final LimitScope sizeScope = 3352 allowPartialResults ? LimitScope.BETWEEN_CELLS : LimitScope.BETWEEN_ROWS; 3353 final LimitScope timeScope = 3354 allowHeartbeatMessages ? LimitScope.BETWEEN_CELLS : LimitScope.BETWEEN_ROWS; 3355 3356 boolean trackMetrics = request.hasTrackScanMetrics() && request.getTrackScanMetrics(); 3357 3358 // Configure with limits for this RPC. Set keep progress true since size progress 3359 // towards size limit should be kept between calls to nextRaw 3360 ScannerContext.Builder contextBuilder = ScannerContext.newBuilder(true); 3361 // maxResultSize - either we can reach this much size for all cells(being read) data or sum 3362 // of heap size occupied by cells(being read). Cell data means its key and value parts. 3363 contextBuilder.setSizeLimit(sizeScope, maxResultSize, maxResultSize); 3364 contextBuilder.setBatchLimit(scanner.getBatch()); 3365 contextBuilder.setTimeLimit(timeScope, timeLimit); 3366 contextBuilder.setTrackMetrics(trackMetrics); 3367 ScannerContext scannerContext = contextBuilder.build(); 3368 boolean limitReached = false; 3369 while (numOfResults < maxResults) { 3370 // Reset the batch progress to 0 before every call to RegionScanner#nextRaw. The 3371 // batch limit is a limit on the number of cells per Result. Thus, if progress is 3372 // being tracked (i.e. scannerContext.keepProgress() is true) then we need to 3373 // reset the batch progress between nextRaw invocations since we don't want the 3374 // batch progress from previous calls to affect future calls 3375 scannerContext.setBatchProgress(0); 3376 assert values.isEmpty(); 3377 3378 // Collect values to be returned here 3379 moreRows = scanner.nextRaw(values, scannerContext); 3380 if (rpcCall == null) { 3381 // When there is no RpcCallContext,copy EC to heap, then the scanner would close, 3382 // This can be an EXPENSIVE call. It may make an extra copy from offheap to onheap 3383 // buffers.See more details in HBASE-26036. 3384 CellUtil.cloneIfNecessary(values); 3385 } 3386 numOfNextRawCalls++; 3387 3388 if (!values.isEmpty()) { 3389 if (limitOfRows > 0) { 3390 // First we need to check if the last result is partial and we have a row change. If 3391 // so then we need to increase the numOfCompleteRows. 3392 if (results.isEmpty()) { 3393 if ( 3394 rsh.rowOfLastPartialResult != null 3395 && !CellUtil.matchingRows(values.get(0), rsh.rowOfLastPartialResult) 3396 ) { 3397 numOfCompleteRows++; 3398 checkLimitOfRows(numOfCompleteRows, limitOfRows, moreRows, scannerContext, 3399 builder); 3400 } 3401 } else { 3402 Result lastResult = results.get(results.size() - 1); 3403 if ( 3404 lastResult.mayHaveMoreCellsInRow() 3405 && !CellUtil.matchingRows(values.get(0), lastResult.getRow()) 3406 ) { 3407 numOfCompleteRows++; 3408 checkLimitOfRows(numOfCompleteRows, limitOfRows, moreRows, scannerContext, 3409 builder); 3410 } 3411 } 3412 if (builder.hasMoreResults() && !builder.getMoreResults()) { 3413 break; 3414 } 3415 } 3416 boolean mayHaveMoreCellsInRow = scannerContext.mayHaveMoreCellsInRow(); 3417 Result r = Result.create(values, null, stale, mayHaveMoreCellsInRow); 3418 lastBlock.setValue(addSize(rpcCall, r, lastBlock.getValue())); 3419 results.add(r); 3420 numOfResults++; 3421 if (!mayHaveMoreCellsInRow && limitOfRows > 0) { 3422 numOfCompleteRows++; 3423 checkLimitOfRows(numOfCompleteRows, limitOfRows, moreRows, scannerContext, builder); 3424 if (builder.hasMoreResults() && !builder.getMoreResults()) { 3425 break; 3426 } 3427 } 3428 } else if (!moreRows && !results.isEmpty()) { 3429 // No more cells for the scan here, we need to ensure that the mayHaveMoreCellsInRow of 3430 // last result is false. Otherwise it's possible that: the first nextRaw returned 3431 // because BATCH_LIMIT_REACHED (BTW it happen to exhaust all cells of the scan),so the 3432 // last result's mayHaveMoreCellsInRow will be true. while the following nextRaw will 3433 // return with moreRows=false, which means moreResultsInRegion would be false, it will 3434 // be a contradictory state (HBASE-21206). 3435 int lastIdx = results.size() - 1; 3436 Result r = results.get(lastIdx); 3437 if (r.mayHaveMoreCellsInRow()) { 3438 results.set(lastIdx, Result.create(r.rawCells(), r.getExists(), r.isStale(), false)); 3439 } 3440 } 3441 boolean sizeLimitReached = scannerContext.checkSizeLimit(LimitScope.BETWEEN_ROWS); 3442 boolean timeLimitReached = scannerContext.checkTimeLimit(LimitScope.BETWEEN_ROWS); 3443 boolean resultsLimitReached = numOfResults >= maxResults; 3444 limitReached = sizeLimitReached || timeLimitReached || resultsLimitReached; 3445 3446 if (limitReached || !moreRows) { 3447 // We only want to mark a ScanResponse as a heartbeat message in the event that 3448 // there are more values to be read server side. If there aren't more values, 3449 // marking it as a heartbeat is wasteful because the client will need to issue 3450 // another ScanRequest only to realize that they already have all the values 3451 if (moreRows && timeLimitReached) { 3452 // Heartbeat messages occur when the time limit has been reached. 3453 builder.setHeartbeatMessage(true); 3454 if (rsh.needCursor) { 3455 Cell cursorCell = scannerContext.getLastPeekedCell(); 3456 if (cursorCell != null) { 3457 builder.setCursor(ProtobufUtil.toCursor(cursorCell)); 3458 } 3459 } 3460 } 3461 break; 3462 } 3463 values.clear(); 3464 } 3465 builder.setMoreResultsInRegion(moreRows); 3466 // Check to see if the client requested that we track metrics server side. If the 3467 // client requested metrics, retrieve the metrics from the scanner context. 3468 if (trackMetrics) { 3469 Map<String, Long> metrics = scannerContext.getMetrics().getMetricsMap(); 3470 ScanMetrics.Builder metricBuilder = ScanMetrics.newBuilder(); 3471 NameInt64Pair.Builder pairBuilder = NameInt64Pair.newBuilder(); 3472 3473 for (Entry<String, Long> entry : metrics.entrySet()) { 3474 pairBuilder.setName(entry.getKey()); 3475 pairBuilder.setValue(entry.getValue()); 3476 metricBuilder.addMetrics(pairBuilder.build()); 3477 } 3478 3479 builder.setScanMetrics(metricBuilder.build()); 3480 } 3481 } 3482 } finally { 3483 region.closeRegionOperation(); 3484 // Update serverside metrics, even on error. 3485 long end = EnvironmentEdgeManager.currentTime(); 3486 long responseCellSize = rpcCall != null ? rpcCall.getResponseCellSize() : 0; 3487 region.getMetrics().updateScanTime(end - before); 3488 final MetricsRegionServer metricsRegionServer = regionServer.getMetrics(); 3489 if (metricsRegionServer != null) { 3490 metricsRegionServer.updateScanSize(region.getTableDescriptor().getTableName(), 3491 responseCellSize); 3492 metricsRegionServer.updateScanTime(region.getTableDescriptor().getTableName(), 3493 end - before); 3494 metricsRegionServer.updateReadQueryMeter(region.getRegionInfo().getTable(), 3495 numOfNextRawCalls); 3496 } 3497 } 3498 // coprocessor postNext hook 3499 if (region.getCoprocessorHost() != null) { 3500 region.getCoprocessorHost().postScannerNext(scanner, results, maxResults, true); 3501 } 3502 } 3503 3504 /** 3505 * Scan data in a table. 3506 * @param controller the RPC controller 3507 * @param request the scan request n 3508 */ 3509 @Override 3510 public ScanResponse scan(final RpcController controller, final ScanRequest request) 3511 throws ServiceException { 3512 if (controller != null && !(controller instanceof HBaseRpcController)) { 3513 throw new UnsupportedOperationException( 3514 "We only do " + "HBaseRpcControllers! FIX IF A PROBLEM: " + controller); 3515 } 3516 if (!request.hasScannerId() && !request.hasScan()) { 3517 throw new ServiceException( 3518 new DoNotRetryIOException("Missing required input: scannerId or scan")); 3519 } 3520 try { 3521 checkOpen(); 3522 } catch (IOException e) { 3523 if (request.hasScannerId()) { 3524 String scannerName = toScannerName(request.getScannerId()); 3525 if (LOG.isDebugEnabled()) { 3526 LOG.debug( 3527 "Server shutting down and client tried to access missing scanner " + scannerName); 3528 } 3529 final LeaseManager leaseManager = regionServer.getLeaseManager(); 3530 if (leaseManager != null) { 3531 try { 3532 leaseManager.cancelLease(scannerName); 3533 } catch (LeaseException le) { 3534 // No problem, ignore 3535 if (LOG.isTraceEnabled()) { 3536 LOG.trace("Un-able to cancel lease of scanner. It could already be closed."); 3537 } 3538 } 3539 } 3540 } 3541 throw new ServiceException(e); 3542 } 3543 requestCount.increment(); 3544 rpcScanRequestCount.increment(); 3545 RegionScannerHolder rsh; 3546 ScanResponse.Builder builder = ScanResponse.newBuilder(); 3547 String scannerName; 3548 try { 3549 if (request.hasScannerId()) { 3550 // The downstream projects such as AsyncHBase in OpenTSDB need this value. See HBASE-18000 3551 // for more details. 3552 long scannerId = request.getScannerId(); 3553 builder.setScannerId(scannerId); 3554 scannerName = toScannerName(scannerId); 3555 rsh = getRegionScanner(request); 3556 } else { 3557 Pair<String, RegionScannerHolder> scannerNameAndRSH = newRegionScanner(request, builder); 3558 scannerName = scannerNameAndRSH.getFirst(); 3559 rsh = scannerNameAndRSH.getSecond(); 3560 } 3561 } catch (IOException e) { 3562 if (e == SCANNER_ALREADY_CLOSED) { 3563 // Now we will close scanner automatically if there are no more results for this region but 3564 // the old client will still send a close request to us. Just ignore it and return. 3565 return builder.build(); 3566 } 3567 throw new ServiceException(e); 3568 } 3569 if (rsh.fullRegionScan) { 3570 rpcFullScanRequestCount.increment(); 3571 } 3572 HRegion region = rsh.r; 3573 LeaseManager.Lease lease; 3574 try { 3575 // Remove lease while its being processed in server; protects against case 3576 // where processing of request takes > lease expiration time. or null if none found. 3577 lease = regionServer.getLeaseManager().removeLease(scannerName); 3578 } catch (LeaseException e) { 3579 throw new ServiceException(e); 3580 } 3581 if (request.hasRenew() && request.getRenew()) { 3582 // add back and return 3583 addScannerLeaseBack(lease); 3584 try { 3585 checkScanNextCallSeq(request, rsh); 3586 } catch (OutOfOrderScannerNextException e) { 3587 throw new ServiceException(e); 3588 } 3589 return builder.build(); 3590 } 3591 OperationQuota quota; 3592 try { 3593 quota = getRpcQuotaManager().checkQuota(region, OperationQuota.OperationType.SCAN); 3594 } catch (IOException e) { 3595 addScannerLeaseBack(lease); 3596 throw new ServiceException(e); 3597 } 3598 try { 3599 checkScanNextCallSeq(request, rsh); 3600 } catch (OutOfOrderScannerNextException e) { 3601 addScannerLeaseBack(lease); 3602 throw new ServiceException(e); 3603 } 3604 // Now we have increased the next call sequence. If we give client an error, the retry will 3605 // never success. So we'd better close the scanner and return a DoNotRetryIOException to client 3606 // and then client will try to open a new scanner. 3607 boolean closeScanner = request.hasCloseScanner() ? request.getCloseScanner() : false; 3608 int rows; // this is scan.getCaching 3609 if (request.hasNumberOfRows()) { 3610 rows = request.getNumberOfRows(); 3611 } else { 3612 rows = closeScanner ? 0 : 1; 3613 } 3614 RpcCall rpcCall = RpcServer.getCurrentCall().orElse(null); 3615 // now let's do the real scan. 3616 long maxQuotaResultSize = Math.min(maxScannerResultSize, quota.getReadAvailable()); 3617 RegionScanner scanner = rsh.s; 3618 // this is the limit of rows for this scan, if we the number of rows reach this value, we will 3619 // close the scanner. 3620 int limitOfRows; 3621 if (request.hasLimitOfRows()) { 3622 limitOfRows = request.getLimitOfRows(); 3623 } else { 3624 limitOfRows = -1; 3625 } 3626 MutableObject<Object> lastBlock = new MutableObject<>(); 3627 boolean scannerClosed = false; 3628 try { 3629 List<Result> results = new ArrayList<>(Math.min(rows, 512)); 3630 if (rows > 0) { 3631 boolean done = false; 3632 // Call coprocessor. Get region info from scanner. 3633 if (region.getCoprocessorHost() != null) { 3634 Boolean bypass = region.getCoprocessorHost().preScannerNext(scanner, results, rows); 3635 if (!results.isEmpty()) { 3636 for (Result r : results) { 3637 lastBlock.setValue(addSize(rpcCall, r, lastBlock.getValue())); 3638 } 3639 } 3640 if (bypass != null && bypass.booleanValue()) { 3641 done = true; 3642 } 3643 } 3644 if (!done) { 3645 scan((HBaseRpcController) controller, request, rsh, maxQuotaResultSize, rows, limitOfRows, 3646 results, builder, lastBlock, rpcCall); 3647 } else { 3648 builder.setMoreResultsInRegion(!results.isEmpty()); 3649 } 3650 } else { 3651 // This is a open scanner call with numberOfRow = 0, so set more results in region to true. 3652 builder.setMoreResultsInRegion(true); 3653 } 3654 3655 quota.addScanResult(results); 3656 addResults(builder, results, (HBaseRpcController) controller, 3657 RegionReplicaUtil.isDefaultReplica(region.getRegionInfo()), 3658 isClientCellBlockSupport(rpcCall)); 3659 if (scanner.isFilterDone() && results.isEmpty()) { 3660 // If the scanner's filter - if any - is done with the scan 3661 // only set moreResults to false if the results is empty. This is used to keep compatible 3662 // with the old scan implementation where we just ignore the returned results if moreResults 3663 // is false. Can remove the isEmpty check after we get rid of the old implementation. 3664 builder.setMoreResults(false); 3665 } 3666 // Later we may close the scanner depending on this flag so here we need to make sure that we 3667 // have already set this flag. 3668 assert builder.hasMoreResultsInRegion(); 3669 // we only set moreResults to false in the above code, so set it to true if we haven't set it 3670 // yet. 3671 if (!builder.hasMoreResults()) { 3672 builder.setMoreResults(true); 3673 } 3674 if (builder.getMoreResults() && builder.getMoreResultsInRegion() && !results.isEmpty()) { 3675 // Record the last cell of the last result if it is a partial result 3676 // We need this to calculate the complete rows we have returned to client as the 3677 // mayHaveMoreCellsInRow is true does not mean that there will be extra cells for the 3678 // current row. We may filter out all the remaining cells for the current row and just 3679 // return the cells of the nextRow when calling RegionScanner.nextRaw. So here we need to 3680 // check for row change. 3681 Result lastResult = results.get(results.size() - 1); 3682 if (lastResult.mayHaveMoreCellsInRow()) { 3683 rsh.rowOfLastPartialResult = lastResult.getRow(); 3684 } else { 3685 rsh.rowOfLastPartialResult = null; 3686 } 3687 } 3688 if (!builder.getMoreResults() || !builder.getMoreResultsInRegion() || closeScanner) { 3689 scannerClosed = true; 3690 closeScanner(region, scanner, scannerName, rpcCall); 3691 } 3692 3693 // There's no point returning to a timed out client. Throwing ensures scanner is closed 3694 if (rpcCall != null && EnvironmentEdgeManager.currentTime() > rpcCall.getDeadline()) { 3695 throw new TimeoutIOException("Client deadline exceeded, cannot return results"); 3696 } 3697 3698 return builder.build(); 3699 } catch (IOException e) { 3700 try { 3701 // scanner is closed here 3702 scannerClosed = true; 3703 // The scanner state might be left in a dirty state, so we will tell the Client to 3704 // fail this RPC and close the scanner while opening up another one from the start of 3705 // row that the client has last seen. 3706 closeScanner(region, scanner, scannerName, rpcCall); 3707 3708 // If it is a DoNotRetryIOException already, throw as it is. Unfortunately, DNRIOE is 3709 // used in two different semantics. 3710 // (1) The first is to close the client scanner and bubble up the exception all the way 3711 // to the application. This is preferred when the exception is really un-recoverable 3712 // (like CorruptHFileException, etc). Plain DoNotRetryIOException also falls into this 3713 // bucket usually. 3714 // (2) Second semantics is to close the current region scanner only, but continue the 3715 // client scanner by overriding the exception. This is usually UnknownScannerException, 3716 // OutOfOrderScannerNextException, etc where the region scanner has to be closed, but the 3717 // application-level ClientScanner has to continue without bubbling up the exception to 3718 // the client. See ClientScanner code to see how it deals with these special exceptions. 3719 if (e instanceof DoNotRetryIOException) { 3720 throw e; 3721 } 3722 3723 // If it is a FileNotFoundException, wrap as a 3724 // DoNotRetryIOException. This can avoid the retry in ClientScanner. 3725 if (e instanceof FileNotFoundException) { 3726 throw new DoNotRetryIOException(e); 3727 } 3728 3729 // We closed the scanner already. Instead of throwing the IOException, and client 3730 // retrying with the same scannerId only to get USE on the next RPC, we directly throw 3731 // a special exception to save an RPC. 3732 if (VersionInfoUtil.hasMinimumVersion(rpcCall.getClientVersionInfo(), 1, 4)) { 3733 // 1.4.0+ clients know how to handle 3734 throw new ScannerResetException("Scanner is closed on the server-side", e); 3735 } else { 3736 // older clients do not know about SRE. Just throw USE, which they will handle 3737 throw new UnknownScannerException("Throwing UnknownScannerException to reset the client" 3738 + " scanner state for clients older than 1.3.", e); 3739 } 3740 } catch (IOException ioe) { 3741 throw new ServiceException(ioe); 3742 } 3743 } finally { 3744 if (!scannerClosed) { 3745 // Adding resets expiration time on lease. 3746 // the closeCallBack will be set in closeScanner so here we only care about shippedCallback 3747 if (rpcCall != null) { 3748 rpcCall.setCallBack(rsh.shippedCallback); 3749 } else { 3750 // If context is null,here we call rsh.shippedCallback directly to reuse the logic in 3751 // rsh.shippedCallback to release the internal resources in rsh,and lease is also added 3752 // back to regionserver's LeaseManager in rsh.shippedCallback. 3753 runShippedCallback(rsh); 3754 } 3755 } 3756 quota.close(); 3757 } 3758 } 3759 3760 private void runShippedCallback(RegionScannerHolder rsh) throws ServiceException { 3761 assert rsh.shippedCallback != null; 3762 try { 3763 rsh.shippedCallback.run(); 3764 } catch (IOException ioe) { 3765 throw new ServiceException(ioe); 3766 } 3767 } 3768 3769 private void closeScanner(HRegion region, RegionScanner scanner, String scannerName, 3770 RpcCallContext context) throws IOException { 3771 if (region.getCoprocessorHost() != null) { 3772 if (region.getCoprocessorHost().preScannerClose(scanner)) { 3773 // bypass the actual close. 3774 return; 3775 } 3776 } 3777 RegionScannerHolder rsh = scanners.remove(scannerName); 3778 if (rsh != null) { 3779 if (context != null) { 3780 context.setCallBack(rsh.closeCallBack); 3781 } else { 3782 rsh.s.close(); 3783 } 3784 if (region.getCoprocessorHost() != null) { 3785 region.getCoprocessorHost().postScannerClose(scanner); 3786 } 3787 closedScanners.put(scannerName, scannerName); 3788 } 3789 } 3790 3791 @Override 3792 public CoprocessorServiceResponse execRegionServerService(RpcController controller, 3793 CoprocessorServiceRequest request) throws ServiceException { 3794 rpcPreCheck("execRegionServerService"); 3795 return regionServer.execRegionServerService(controller, request); 3796 } 3797 3798 @Override 3799 public UpdateConfigurationResponse updateConfiguration(RpcController controller, 3800 UpdateConfigurationRequest request) throws ServiceException { 3801 try { 3802 requirePermission("updateConfiguration", Permission.Action.ADMIN); 3803 this.regionServer.updateConfiguration(); 3804 } catch (Exception e) { 3805 throw new ServiceException(e); 3806 } 3807 return UpdateConfigurationResponse.getDefaultInstance(); 3808 } 3809 3810 @Override 3811 public GetSpaceQuotaSnapshotsResponse getSpaceQuotaSnapshots(RpcController controller, 3812 GetSpaceQuotaSnapshotsRequest request) throws ServiceException { 3813 try { 3814 final RegionServerSpaceQuotaManager manager = regionServer.getRegionServerSpaceQuotaManager(); 3815 final GetSpaceQuotaSnapshotsResponse.Builder builder = 3816 GetSpaceQuotaSnapshotsResponse.newBuilder(); 3817 if (manager != null) { 3818 final Map<TableName, SpaceQuotaSnapshot> snapshots = manager.copyQuotaSnapshots(); 3819 for (Entry<TableName, SpaceQuotaSnapshot> snapshot : snapshots.entrySet()) { 3820 builder.addSnapshots(TableQuotaSnapshot.newBuilder() 3821 .setTableName(ProtobufUtil.toProtoTableName(snapshot.getKey())) 3822 .setSnapshot(SpaceQuotaSnapshot.toProtoSnapshot(snapshot.getValue())).build()); 3823 } 3824 } 3825 return builder.build(); 3826 } catch (Exception e) { 3827 throw new ServiceException(e); 3828 } 3829 } 3830 3831 @Override 3832 public ClearRegionBlockCacheResponse clearRegionBlockCache(RpcController controller, 3833 ClearRegionBlockCacheRequest request) throws ServiceException { 3834 rpcPreCheck("clearRegionBlockCache"); 3835 ClearRegionBlockCacheResponse.Builder builder = ClearRegionBlockCacheResponse.newBuilder(); 3836 CacheEvictionStatsBuilder stats = CacheEvictionStats.builder(); 3837 List<HRegion> regions = getRegions(request.getRegionList(), stats); 3838 for (HRegion region : regions) { 3839 try { 3840 stats = stats.append(this.regionServer.clearRegionBlockCache(region)); 3841 } catch (Exception e) { 3842 stats.addException(region.getRegionInfo().getRegionName(), e); 3843 } 3844 } 3845 stats.withMaxCacheSize(regionServer.getBlockCache().map(BlockCache::getMaxSize).orElse(0L)); 3846 return builder.setStats(ProtobufUtil.toCacheEvictionStats(stats.build())).build(); 3847 } 3848 3849 private void executeOpenRegionProcedures(OpenRegionRequest request, 3850 Map<TableName, TableDescriptor> tdCache) { 3851 long masterSystemTime = request.hasMasterSystemTime() ? request.getMasterSystemTime() : -1; 3852 for (RegionOpenInfo regionOpenInfo : request.getOpenInfoList()) { 3853 RegionInfo regionInfo = ProtobufUtil.toRegionInfo(regionOpenInfo.getRegion()); 3854 TableName tableName = regionInfo.getTable(); 3855 TableDescriptor tableDesc = tdCache.get(tableName); 3856 if (tableDesc == null) { 3857 try { 3858 tableDesc = regionServer.getTableDescriptors().get(regionInfo.getTable()); 3859 } catch (IOException e) { 3860 // Here we do not fail the whole method since we also need deal with other 3861 // procedures, and we can not ignore this one, so we still schedule a 3862 // AssignRegionHandler and it will report back to master if we still can not get the 3863 // TableDescriptor. 3864 LOG.warn("Failed to get TableDescriptor of {}, will try again in the handler", 3865 regionInfo.getTable(), e); 3866 } 3867 if (tableDesc != null) { 3868 tdCache.put(tableName, tableDesc); 3869 } 3870 } 3871 if (regionOpenInfo.getFavoredNodesCount() > 0) { 3872 regionServer.updateRegionFavoredNodesMapping(regionInfo.getEncodedName(), 3873 regionOpenInfo.getFavoredNodesList()); 3874 } 3875 long procId = regionOpenInfo.getOpenProcId(); 3876 if (regionServer.submitRegionProcedure(procId)) { 3877 regionServer.executorService.submit(AssignRegionHandler.create(regionServer, regionInfo, 3878 procId, tableDesc, masterSystemTime)); 3879 } 3880 } 3881 } 3882 3883 private void executeCloseRegionProcedures(CloseRegionRequest request) { 3884 String encodedName; 3885 try { 3886 encodedName = ProtobufUtil.getRegionEncodedName(request.getRegion()); 3887 } catch (DoNotRetryIOException e) { 3888 throw new UncheckedIOException("Should not happen", e); 3889 } 3890 ServerName destination = request.hasDestinationServer() 3891 ? ProtobufUtil.toServerName(request.getDestinationServer()) 3892 : null; 3893 long procId = request.getCloseProcId(); 3894 if (regionServer.submitRegionProcedure(procId)) { 3895 regionServer.executorService.submit( 3896 UnassignRegionHandler.create(regionServer, encodedName, procId, false, destination)); 3897 } 3898 } 3899 3900 private void executeProcedures(RemoteProcedureRequest request) { 3901 RSProcedureCallable callable; 3902 try { 3903 callable = Class.forName(request.getProcClass()).asSubclass(RSProcedureCallable.class) 3904 .getDeclaredConstructor().newInstance(); 3905 } catch (Exception e) { 3906 LOG.warn("Failed to instantiating remote procedure {}, pid={}", request.getProcClass(), 3907 request.getProcId(), e); 3908 regionServer.remoteProcedureComplete(request.getProcId(), e); 3909 return; 3910 } 3911 callable.init(request.getProcData().toByteArray(), regionServer); 3912 LOG.debug("Executing remote procedure {}, pid={}", callable.getClass(), request.getProcId()); 3913 regionServer.executeProcedure(request.getProcId(), callable); 3914 } 3915 3916 @Override 3917 @QosPriority(priority = HConstants.ADMIN_QOS) 3918 public ExecuteProceduresResponse executeProcedures(RpcController controller, 3919 ExecuteProceduresRequest request) throws ServiceException { 3920 try { 3921 checkOpen(); 3922 throwOnWrongStartCode(request); 3923 regionServer.getRegionServerCoprocessorHost().preExecuteProcedures(); 3924 if (request.getOpenRegionCount() > 0) { 3925 // Avoid reading from the TableDescritor every time(usually it will read from the file 3926 // system) 3927 Map<TableName, TableDescriptor> tdCache = new HashMap<>(); 3928 request.getOpenRegionList().forEach(req -> executeOpenRegionProcedures(req, tdCache)); 3929 } 3930 if (request.getCloseRegionCount() > 0) { 3931 request.getCloseRegionList().forEach(this::executeCloseRegionProcedures); 3932 } 3933 if (request.getProcCount() > 0) { 3934 request.getProcList().forEach(this::executeProcedures); 3935 } 3936 regionServer.getRegionServerCoprocessorHost().postExecuteProcedures(); 3937 return ExecuteProceduresResponse.getDefaultInstance(); 3938 } catch (IOException e) { 3939 throw new ServiceException(e); 3940 } 3941 } 3942 3943 @Override 3944 @QosPriority(priority = HConstants.ADMIN_QOS) 3945 public SlowLogResponses getSlowLogResponses(final RpcController controller, 3946 final SlowLogResponseRequest request) { 3947 final NamedQueueRecorder namedQueueRecorder = this.regionServer.getNamedQueueRecorder(); 3948 final List<SlowLogPayload> slowLogPayloads = getSlowLogPayloads(request, namedQueueRecorder); 3949 SlowLogResponses slowLogResponses = 3950 SlowLogResponses.newBuilder().addAllSlowLogPayloads(slowLogPayloads).build(); 3951 return slowLogResponses; 3952 } 3953 3954 private List<SlowLogPayload> getSlowLogPayloads(SlowLogResponseRequest request, 3955 NamedQueueRecorder namedQueueRecorder) { 3956 if (namedQueueRecorder == null) { 3957 return Collections.emptyList(); 3958 } 3959 List<SlowLogPayload> slowLogPayloads; 3960 NamedQueueGetRequest namedQueueGetRequest = new NamedQueueGetRequest(); 3961 namedQueueGetRequest.setNamedQueueEvent(RpcLogDetails.SLOW_LOG_EVENT); 3962 namedQueueGetRequest.setSlowLogResponseRequest(request); 3963 NamedQueueGetResponse namedQueueGetResponse = 3964 namedQueueRecorder.getNamedQueueRecords(namedQueueGetRequest); 3965 slowLogPayloads = namedQueueGetResponse != null 3966 ? namedQueueGetResponse.getSlowLogPayloads() 3967 : Collections.emptyList(); 3968 return slowLogPayloads; 3969 } 3970 3971 @Override 3972 @QosPriority(priority = HConstants.ADMIN_QOS) 3973 public SlowLogResponses getLargeLogResponses(final RpcController controller, 3974 final SlowLogResponseRequest request) { 3975 final NamedQueueRecorder namedQueueRecorder = this.regionServer.getNamedQueueRecorder(); 3976 final List<SlowLogPayload> slowLogPayloads = getSlowLogPayloads(request, namedQueueRecorder); 3977 SlowLogResponses slowLogResponses = 3978 SlowLogResponses.newBuilder().addAllSlowLogPayloads(slowLogPayloads).build(); 3979 return slowLogResponses; 3980 } 3981 3982 @Override 3983 @QosPriority(priority = HConstants.ADMIN_QOS) 3984 public ClearSlowLogResponses clearSlowLogsResponses(final RpcController controller, 3985 final ClearSlowLogResponseRequest request) throws ServiceException { 3986 rpcPreCheck("clearSlowLogsResponses"); 3987 final NamedQueueRecorder namedQueueRecorder = this.regionServer.getNamedQueueRecorder(); 3988 boolean slowLogsCleaned = Optional.ofNullable(namedQueueRecorder) 3989 .map( 3990 queueRecorder -> queueRecorder.clearNamedQueue(NamedQueuePayload.NamedQueueEvent.SLOW_LOG)) 3991 .orElse(false); 3992 ClearSlowLogResponses clearSlowLogResponses = 3993 ClearSlowLogResponses.newBuilder().setIsCleaned(slowLogsCleaned).build(); 3994 return clearSlowLogResponses; 3995 } 3996 3997 @Override 3998 public HBaseProtos.LogEntry getLogEntries(RpcController controller, 3999 HBaseProtos.LogRequest request) throws ServiceException { 4000 try { 4001 final String logClassName = request.getLogClassName(); 4002 Class<?> logClass = Class.forName(logClassName).asSubclass(Message.class); 4003 Method method = logClass.getMethod("parseFrom", ByteString.class); 4004 if (logClassName.contains("SlowLogResponseRequest")) { 4005 SlowLogResponseRequest slowLogResponseRequest = 4006 (SlowLogResponseRequest) method.invoke(null, request.getLogMessage()); 4007 final NamedQueueRecorder namedQueueRecorder = this.regionServer.getNamedQueueRecorder(); 4008 final List<SlowLogPayload> slowLogPayloads = 4009 getSlowLogPayloads(slowLogResponseRequest, namedQueueRecorder); 4010 SlowLogResponses slowLogResponses = 4011 SlowLogResponses.newBuilder().addAllSlowLogPayloads(slowLogPayloads).build(); 4012 return HBaseProtos.LogEntry.newBuilder() 4013 .setLogClassName(slowLogResponses.getClass().getName()) 4014 .setLogMessage(slowLogResponses.toByteString()).build(); 4015 } 4016 } catch (ClassNotFoundException | NoSuchMethodException | IllegalAccessException 4017 | InvocationTargetException e) { 4018 LOG.error("Error while retrieving log entries.", e); 4019 throw new ServiceException(e); 4020 } 4021 throw new ServiceException("Invalid request params"); 4022 } 4023 4024 public RpcScheduler getRpcScheduler() { 4025 return rpcServer.getScheduler(); 4026 } 4027 4028 protected AccessChecker getAccessChecker() { 4029 return accessChecker; 4030 } 4031 4032 protected ZKPermissionWatcher getZkPermissionWatcher() { 4033 return zkPermissionWatcher; 4034 } 4035 4036 @Override 4037 public GetClusterIdResponse getClusterId(RpcController controller, GetClusterIdRequest request) 4038 throws ServiceException { 4039 return GetClusterIdResponse.newBuilder().setClusterId(regionServer.getClusterId()).build(); 4040 } 4041 4042 @Override 4043 public GetActiveMasterResponse getActiveMaster(RpcController controller, 4044 GetActiveMasterRequest request) throws ServiceException { 4045 GetActiveMasterResponse.Builder builder = GetActiveMasterResponse.newBuilder(); 4046 regionServer.getActiveMaster() 4047 .ifPresent(name -> builder.setServerName(ProtobufUtil.toServerName(name))); 4048 return builder.build(); 4049 } 4050 4051 @Override 4052 public GetMastersResponse getMasters(RpcController controller, GetMastersRequest request) 4053 throws ServiceException { 4054 GetMastersResponse.Builder builder = GetMastersResponse.newBuilder(); 4055 regionServer.getActiveMaster() 4056 .ifPresent(activeMaster -> builder.addMasterServers(GetMastersResponseEntry.newBuilder() 4057 .setServerName(ProtobufUtil.toServerName(activeMaster)).setIsActive(true))); 4058 regionServer.getBackupMasters() 4059 .forEach(backupMaster -> builder.addMasterServers(GetMastersResponseEntry.newBuilder() 4060 .setServerName(ProtobufUtil.toServerName(backupMaster)).setIsActive(false))); 4061 return builder.build(); 4062 } 4063 4064 @Override 4065 public GetMetaRegionLocationsResponse getMetaRegionLocations(RpcController controller, 4066 GetMetaRegionLocationsRequest request) throws ServiceException { 4067 GetMetaRegionLocationsResponse.Builder builder = GetMetaRegionLocationsResponse.newBuilder(); 4068 Optional<List<HRegionLocation>> metaLocations = 4069 regionServer.getMetaRegionLocationCache().getMetaRegionLocations(); 4070 metaLocations.ifPresent(hRegionLocations -> hRegionLocations 4071 .forEach(location -> builder.addMetaLocations(ProtobufUtil.toRegionLocation(location)))); 4072 return builder.build(); 4073 } 4074 4075 @Override 4076 public final GetBootstrapNodesResponse getBootstrapNodes(RpcController controller, 4077 GetBootstrapNodesRequest request) throws ServiceException { 4078 int maxNodeCount = regionServer.getConfiguration().getInt(CLIENT_BOOTSTRAP_NODE_LIMIT, 4079 DEFAULT_CLIENT_BOOTSTRAP_NODE_LIMIT); 4080 ReservoirSample<ServerName> sample = new ReservoirSample<>(maxNodeCount); 4081 sample.add(regionServer.getRegionServers()); 4082 4083 GetBootstrapNodesResponse.Builder builder = GetBootstrapNodesResponse.newBuilder(); 4084 sample.getSamplingResult().stream().map(ProtobufUtil::toServerName) 4085 .forEach(builder::addServerName); 4086 return builder.build(); 4087 } 4088}