001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019 020import com.google.errorprone.annotations.RestrictedApi; 021import java.io.FileNotFoundException; 022import java.io.IOException; 023import java.io.UncheckedIOException; 024import java.net.BindException; 025import java.net.InetAddress; 026import java.net.InetSocketAddress; 027import java.util.ArrayList; 028import java.util.Arrays; 029import java.util.Collections; 030import java.util.HashMap; 031import java.util.Iterator; 032import java.util.List; 033import java.util.Map; 034import java.util.Map.Entry; 035import java.util.NavigableMap; 036import java.util.Set; 037import java.util.TreeSet; 038import java.util.concurrent.ConcurrentHashMap; 039import java.util.concurrent.ConcurrentMap; 040import java.util.concurrent.TimeUnit; 041import java.util.concurrent.atomic.AtomicBoolean; 042import java.util.concurrent.atomic.AtomicLong; 043import java.util.concurrent.atomic.LongAdder; 044import org.apache.hadoop.conf.Configuration; 045import org.apache.hadoop.fs.FileSystem; 046import org.apache.hadoop.fs.Path; 047import org.apache.hadoop.hbase.CacheEvictionStats; 048import org.apache.hadoop.hbase.CacheEvictionStatsBuilder; 049import org.apache.hadoop.hbase.Cell; 050import org.apache.hadoop.hbase.CellScannable; 051import org.apache.hadoop.hbase.CellScanner; 052import org.apache.hadoop.hbase.CellUtil; 053import org.apache.hadoop.hbase.DoNotRetryIOException; 054import org.apache.hadoop.hbase.DroppedSnapshotException; 055import org.apache.hadoop.hbase.HBaseIOException; 056import org.apache.hadoop.hbase.HBaseRpcServicesBase; 057import org.apache.hadoop.hbase.HConstants; 058import org.apache.hadoop.hbase.MultiActionResultTooLarge; 059import org.apache.hadoop.hbase.NotServingRegionException; 060import org.apache.hadoop.hbase.PrivateCellUtil; 061import org.apache.hadoop.hbase.RegionTooBusyException; 062import org.apache.hadoop.hbase.Server; 063import org.apache.hadoop.hbase.ServerName; 064import org.apache.hadoop.hbase.TableName; 065import org.apache.hadoop.hbase.UnknownScannerException; 066import org.apache.hadoop.hbase.client.Append; 067import org.apache.hadoop.hbase.client.CheckAndMutate; 068import org.apache.hadoop.hbase.client.CheckAndMutateResult; 069import org.apache.hadoop.hbase.client.Delete; 070import org.apache.hadoop.hbase.client.Durability; 071import org.apache.hadoop.hbase.client.Get; 072import org.apache.hadoop.hbase.client.Increment; 073import org.apache.hadoop.hbase.client.Mutation; 074import org.apache.hadoop.hbase.client.OperationWithAttributes; 075import org.apache.hadoop.hbase.client.Put; 076import org.apache.hadoop.hbase.client.RegionInfo; 077import org.apache.hadoop.hbase.client.RegionReplicaUtil; 078import org.apache.hadoop.hbase.client.Result; 079import org.apache.hadoop.hbase.client.Row; 080import org.apache.hadoop.hbase.client.Scan; 081import org.apache.hadoop.hbase.client.TableDescriptor; 082import org.apache.hadoop.hbase.client.VersionInfoUtil; 083import org.apache.hadoop.hbase.exceptions.FailedSanityCheckException; 084import org.apache.hadoop.hbase.exceptions.OutOfOrderScannerNextException; 085import org.apache.hadoop.hbase.exceptions.ScannerResetException; 086import org.apache.hadoop.hbase.exceptions.TimeoutIOException; 087import org.apache.hadoop.hbase.exceptions.UnknownProtocolException; 088import org.apache.hadoop.hbase.io.ByteBuffAllocator; 089import org.apache.hadoop.hbase.io.hfile.BlockCache; 090import org.apache.hadoop.hbase.ipc.HBaseRpcController; 091import org.apache.hadoop.hbase.ipc.PriorityFunction; 092import org.apache.hadoop.hbase.ipc.QosPriority; 093import org.apache.hadoop.hbase.ipc.RpcCall; 094import org.apache.hadoop.hbase.ipc.RpcCallContext; 095import org.apache.hadoop.hbase.ipc.RpcCallback; 096import org.apache.hadoop.hbase.ipc.RpcServer; 097import org.apache.hadoop.hbase.ipc.RpcServer.BlockingServiceAndInterface; 098import org.apache.hadoop.hbase.ipc.RpcServerFactory; 099import org.apache.hadoop.hbase.ipc.RpcServerInterface; 100import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException; 101import org.apache.hadoop.hbase.ipc.ServerRpcController; 102import org.apache.hadoop.hbase.net.Address; 103import org.apache.hadoop.hbase.procedure2.RSProcedureCallable; 104import org.apache.hadoop.hbase.quotas.ActivePolicyEnforcement; 105import org.apache.hadoop.hbase.quotas.OperationQuota; 106import org.apache.hadoop.hbase.quotas.QuotaUtil; 107import org.apache.hadoop.hbase.quotas.RegionServerRpcQuotaManager; 108import org.apache.hadoop.hbase.quotas.RegionServerSpaceQuotaManager; 109import org.apache.hadoop.hbase.quotas.SpaceQuotaSnapshot; 110import org.apache.hadoop.hbase.quotas.SpaceViolationPolicyEnforcement; 111import org.apache.hadoop.hbase.regionserver.LeaseManager.Lease; 112import org.apache.hadoop.hbase.regionserver.LeaseManager.LeaseStillHeldException; 113import org.apache.hadoop.hbase.regionserver.Region.Operation; 114import org.apache.hadoop.hbase.regionserver.ScannerContext.LimitScope; 115import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker; 116import org.apache.hadoop.hbase.regionserver.handler.AssignRegionHandler; 117import org.apache.hadoop.hbase.regionserver.handler.OpenMetaHandler; 118import org.apache.hadoop.hbase.regionserver.handler.OpenPriorityRegionHandler; 119import org.apache.hadoop.hbase.regionserver.handler.OpenRegionHandler; 120import org.apache.hadoop.hbase.regionserver.handler.UnassignRegionHandler; 121import org.apache.hadoop.hbase.replication.ReplicationUtils; 122import org.apache.hadoop.hbase.replication.regionserver.RejectReplicationRequestStateChecker; 123import org.apache.hadoop.hbase.replication.regionserver.RejectRequestsFromClientStateChecker; 124import org.apache.hadoop.hbase.security.Superusers; 125import org.apache.hadoop.hbase.security.access.Permission; 126import org.apache.hadoop.hbase.util.Bytes; 127import org.apache.hadoop.hbase.util.DNS; 128import org.apache.hadoop.hbase.util.DNS.ServerType; 129import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 130import org.apache.hadoop.hbase.util.Pair; 131import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil; 132import org.apache.hadoop.hbase.wal.WAL; 133import org.apache.hadoop.hbase.wal.WALEdit; 134import org.apache.hadoop.hbase.wal.WALKey; 135import org.apache.hadoop.hbase.wal.WALSplitUtil; 136import org.apache.hadoop.hbase.wal.WALSplitUtil.MutationReplay; 137import org.apache.hadoop.hbase.zookeeper.ZKWatcher; 138import org.apache.yetus.audience.InterfaceAudience; 139import org.slf4j.Logger; 140import org.slf4j.LoggerFactory; 141 142import org.apache.hbase.thirdparty.com.google.common.cache.Cache; 143import org.apache.hbase.thirdparty.com.google.common.cache.CacheBuilder; 144import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList; 145import org.apache.hbase.thirdparty.com.google.common.collect.Lists; 146import org.apache.hbase.thirdparty.com.google.protobuf.ByteString; 147import org.apache.hbase.thirdparty.com.google.protobuf.Message; 148import org.apache.hbase.thirdparty.com.google.protobuf.RpcController; 149import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException; 150import org.apache.hbase.thirdparty.com.google.protobuf.TextFormat; 151import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations; 152import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils; 153 154import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 155import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter; 156import org.apache.hadoop.hbase.shaded.protobuf.ResponseConverter; 157import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService; 158import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearCompactionQueuesRequest; 159import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearCompactionQueuesResponse; 160import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearRegionBlockCacheRequest; 161import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearRegionBlockCacheResponse; 162import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CloseRegionRequest; 163import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CloseRegionResponse; 164import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactRegionRequest; 165import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactRegionResponse; 166import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactionSwitchRequest; 167import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactionSwitchResponse; 168import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ExecuteProceduresRequest; 169import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ExecuteProceduresResponse; 170import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionRequest; 171import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionResponse; 172import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetCachedFilesListRequest; 173import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetCachedFilesListResponse; 174import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetOnlineRegionRequest; 175import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetOnlineRegionResponse; 176import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoRequest; 177import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoResponse; 178import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionLoadRequest; 179import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionLoadResponse; 180import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetServerInfoRequest; 181import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetServerInfoResponse; 182import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetStoreFileRequest; 183import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetStoreFileResponse; 184import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionRequest; 185import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionRequest.RegionOpenInfo; 186import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionResponse; 187import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionResponse.RegionOpeningState; 188import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.RemoteProcedureRequest; 189import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ReplicateWALEntryRequest; 190import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ReplicateWALEntryResponse; 191import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.RollWALWriterRequest; 192import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.RollWALWriterResponse; 193import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.StopServerRequest; 194import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.StopServerResponse; 195import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateFavoredNodesRequest; 196import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateFavoredNodesResponse; 197import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WALEntry; 198import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WarmupRegionRequest; 199import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WarmupRegionResponse; 200import org.apache.hadoop.hbase.shaded.protobuf.generated.BootstrapNodeProtos.BootstrapNodeService; 201import org.apache.hadoop.hbase.shaded.protobuf.generated.BootstrapNodeProtos.GetAllBootstrapNodesRequest; 202import org.apache.hadoop.hbase.shaded.protobuf.generated.BootstrapNodeProtos.GetAllBootstrapNodesResponse; 203import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos; 204import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.Action; 205import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.BulkLoadHFileRequest; 206import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.BulkLoadHFileRequest.FamilyPath; 207import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.BulkLoadHFileResponse; 208import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CleanupBulkLoadRequest; 209import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CleanupBulkLoadResponse; 210import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService; 211import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.Condition; 212import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceRequest; 213import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceResponse; 214import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.GetRequest; 215import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.GetResponse; 216import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MultiRegionLoadStats; 217import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MultiRequest; 218import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MultiResponse; 219import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateRequest; 220import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateResponse; 221import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutationProto; 222import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutationProto.MutationType; 223import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.PrepareBulkLoadRequest; 224import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.PrepareBulkLoadResponse; 225import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.RegionAction; 226import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.RegionActionResult; 227import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ResultOrException; 228import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanRequest; 229import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanResponse; 230import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos; 231import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos.RegionLoad; 232import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.NameBytesPair; 233import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.NameInt64Pair; 234import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier; 235import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType; 236import org.apache.hadoop.hbase.shaded.protobuf.generated.MapReduceProtos.ScanMetrics; 237import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsRequest; 238import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsResponse; 239import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsResponse.TableQuotaSnapshot; 240import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.ClientMetaService; 241import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.BulkLoadDescriptor; 242import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.CompactionDescriptor; 243import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.FlushDescriptor; 244import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.RegionEventDescriptor; 245 246/** 247 * Implements the regionserver RPC services. 248 */ 249@InterfaceAudience.Private 250public class RSRpcServices extends HBaseRpcServicesBase<HRegionServer> 251 implements ClientService.BlockingInterface, BootstrapNodeService.BlockingInterface { 252 253 private static final Logger LOG = LoggerFactory.getLogger(RSRpcServices.class); 254 255 /** RPC scheduler to use for the region server. */ 256 public static final String REGION_SERVER_RPC_SCHEDULER_FACTORY_CLASS = 257 "hbase.region.server.rpc.scheduler.factory.class"; 258 259 /** 260 * Minimum allowable time limit delta (in milliseconds) that can be enforced during scans. This 261 * configuration exists to prevent the scenario where a time limit is specified to be so 262 * restrictive that the time limit is reached immediately (before any cells are scanned). 263 */ 264 private static final String REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA = 265 "hbase.region.server.rpc.minimum.scan.time.limit.delta"; 266 /** 267 * Default value of {@link RSRpcServices#REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA} 268 */ 269 static final long DEFAULT_REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA = 10; 270 271 /** 272 * Whether to reject rows with size > threshold defined by 273 * {@link HConstants#BATCH_ROWS_THRESHOLD_NAME} 274 */ 275 private static final String REJECT_BATCH_ROWS_OVER_THRESHOLD = 276 "hbase.rpc.rows.size.threshold.reject"; 277 278 /** 279 * Default value of config {@link RSRpcServices#REJECT_BATCH_ROWS_OVER_THRESHOLD} 280 */ 281 private static final boolean DEFAULT_REJECT_BATCH_ROWS_OVER_THRESHOLD = false; 282 283 // Request counter. (Includes requests that are not serviced by regions.) 284 // Count only once for requests with multiple actions like multi/caching-scan/replayBatch 285 final LongAdder requestCount = new LongAdder(); 286 287 // Request counter for rpc get 288 final LongAdder rpcGetRequestCount = new LongAdder(); 289 290 // Request counter for rpc scan 291 final LongAdder rpcScanRequestCount = new LongAdder(); 292 293 // Request counter for scans that might end up in full scans 294 final LongAdder rpcFullScanRequestCount = new LongAdder(); 295 296 // Request counter for rpc multi 297 final LongAdder rpcMultiRequestCount = new LongAdder(); 298 299 // Request counter for rpc mutate 300 final LongAdder rpcMutateRequestCount = new LongAdder(); 301 302 private volatile long maxScannerResultSize; 303 304 private ScannerIdGenerator scannerIdGenerator; 305 private final ConcurrentMap<String, RegionScannerHolder> scanners = new ConcurrentHashMap<>(); 306 // Hold the name of a closed scanner for a while. This is used to keep compatible for old clients 307 // which may send next or close request to a region scanner which has already been exhausted. The 308 // entries will be removed automatically after scannerLeaseTimeoutPeriod. 309 private final Cache<String, String> closedScanners; 310 /** 311 * The lease timeout period for client scanners (milliseconds). 312 */ 313 private final int scannerLeaseTimeoutPeriod; 314 315 /** 316 * The RPC timeout period (milliseconds) 317 */ 318 private final int rpcTimeout; 319 320 /** 321 * The minimum allowable delta to use for the scan limit 322 */ 323 private final long minimumScanTimeLimitDelta; 324 325 /** 326 * Row size threshold for multi requests above which a warning is logged 327 */ 328 private volatile int rowSizeWarnThreshold; 329 /* 330 * Whether we should reject requests with very high no of rows i.e. beyond threshold defined by 331 * rowSizeWarnThreshold 332 */ 333 private volatile boolean rejectRowsWithSizeOverThreshold; 334 335 final AtomicBoolean clearCompactionQueues = new AtomicBoolean(false); 336 337 /** 338 * Services launched in RSRpcServices. By default they are on but you can use the below booleans 339 * to selectively enable/disable these services (Rare is the case where you would ever turn off 340 * one or the other). 341 */ 342 public static final String REGIONSERVER_ADMIN_SERVICE_CONFIG = 343 "hbase.regionserver.admin.executorService"; 344 public static final String REGIONSERVER_CLIENT_SERVICE_CONFIG = 345 "hbase.regionserver.client.executorService"; 346 public static final String REGIONSERVER_CLIENT_META_SERVICE_CONFIG = 347 "hbase.regionserver.client.meta.executorService"; 348 public static final String REGIONSERVER_BOOTSTRAP_NODES_SERVICE_CONFIG = 349 "hbase.regionserver.bootstrap.nodes.executorService"; 350 351 /** 352 * An Rpc callback for closing a RegionScanner. 353 */ 354 private static final class RegionScannerCloseCallBack implements RpcCallback { 355 356 private final RegionScanner scanner; 357 358 public RegionScannerCloseCallBack(RegionScanner scanner) { 359 this.scanner = scanner; 360 } 361 362 @Override 363 public void run() throws IOException { 364 this.scanner.close(); 365 } 366 } 367 368 /** 369 * An Rpc callback for doing shipped() call on a RegionScanner. 370 */ 371 private class RegionScannerShippedCallBack implements RpcCallback { 372 private final String scannerName; 373 private final Shipper shipper; 374 private final Lease lease; 375 376 public RegionScannerShippedCallBack(String scannerName, Shipper shipper, Lease lease) { 377 this.scannerName = scannerName; 378 this.shipper = shipper; 379 this.lease = lease; 380 } 381 382 @Override 383 public void run() throws IOException { 384 this.shipper.shipped(); 385 // We're done. On way out re-add the above removed lease. The lease was temp removed for this 386 // Rpc call and we are at end of the call now. Time to add it back. 387 if (scanners.containsKey(scannerName)) { 388 if (lease != null) { 389 server.getLeaseManager().addLease(lease); 390 } 391 } 392 } 393 } 394 395 /** 396 * An RpcCallBack that creates a list of scanners that needs to perform callBack operation on 397 * completion of multiGets. 398 */ 399 static class RegionScannersCloseCallBack implements RpcCallback { 400 private final List<RegionScanner> scanners = new ArrayList<>(); 401 402 public void addScanner(RegionScanner scanner) { 403 this.scanners.add(scanner); 404 } 405 406 @Override 407 public void run() { 408 for (RegionScanner scanner : scanners) { 409 try { 410 scanner.close(); 411 } catch (IOException e) { 412 LOG.error("Exception while closing the scanner " + scanner, e); 413 } 414 } 415 } 416 } 417 418 /** 419 * Holder class which holds the RegionScanner, nextCallSeq and RpcCallbacks together. 420 */ 421 static final class RegionScannerHolder { 422 private final AtomicLong nextCallSeq = new AtomicLong(0); 423 private final RegionScanner s; 424 private final HRegion r; 425 private final RpcCallback closeCallBack; 426 private final RpcCallback shippedCallback; 427 private byte[] rowOfLastPartialResult; 428 private boolean needCursor; 429 private boolean fullRegionScan; 430 private final String clientIPAndPort; 431 private final String userName; 432 private volatile long maxBlockBytesScanned = 0; 433 private volatile long prevBlockBytesScanned = 0; 434 private volatile long prevBlockBytesScannedDifference = 0; 435 436 RegionScannerHolder(RegionScanner s, HRegion r, RpcCallback closeCallBack, 437 RpcCallback shippedCallback, boolean needCursor, boolean fullRegionScan, 438 String clientIPAndPort, String userName) { 439 this.s = s; 440 this.r = r; 441 this.closeCallBack = closeCallBack; 442 this.shippedCallback = shippedCallback; 443 this.needCursor = needCursor; 444 this.fullRegionScan = fullRegionScan; 445 this.clientIPAndPort = clientIPAndPort; 446 this.userName = userName; 447 } 448 449 long getNextCallSeq() { 450 return nextCallSeq.get(); 451 } 452 453 boolean incNextCallSeq(long currentSeq) { 454 // Use CAS to prevent multiple scan request running on the same scanner. 455 return nextCallSeq.compareAndSet(currentSeq, currentSeq + 1); 456 } 457 458 long getMaxBlockBytesScanned() { 459 return maxBlockBytesScanned; 460 } 461 462 long getPrevBlockBytesScannedDifference() { 463 return prevBlockBytesScannedDifference; 464 } 465 466 void updateBlockBytesScanned(long blockBytesScanned) { 467 prevBlockBytesScannedDifference = blockBytesScanned - prevBlockBytesScanned; 468 prevBlockBytesScanned = blockBytesScanned; 469 if (blockBytesScanned > maxBlockBytesScanned) { 470 maxBlockBytesScanned = blockBytesScanned; 471 } 472 } 473 474 // Should be called only when we need to print lease expired messages otherwise 475 // cache the String once made. 476 @Override 477 public String toString() { 478 return "clientIPAndPort=" + this.clientIPAndPort + ", userName=" + this.userName 479 + ", regionInfo=" + this.r.getRegionInfo().getRegionNameAsString(); 480 } 481 } 482 483 /** 484 * Instantiated as a scanner lease. If the lease times out, the scanner is closed 485 */ 486 private class ScannerListener implements LeaseListener { 487 private final String scannerName; 488 489 ScannerListener(final String n) { 490 this.scannerName = n; 491 } 492 493 @Override 494 public void leaseExpired() { 495 RegionScannerHolder rsh = scanners.remove(this.scannerName); 496 if (rsh == null) { 497 LOG.warn("Scanner lease {} expired but no outstanding scanner", this.scannerName); 498 return; 499 } 500 LOG.info("Scanner lease {} expired {}", this.scannerName, rsh); 501 server.getMetrics().incrScannerLeaseExpired(); 502 RegionScanner s = rsh.s; 503 HRegion region = null; 504 try { 505 region = server.getRegion(s.getRegionInfo().getRegionName()); 506 if (region != null && region.getCoprocessorHost() != null) { 507 region.getCoprocessorHost().preScannerClose(s); 508 } 509 } catch (IOException e) { 510 LOG.error("Closing scanner {} {}", this.scannerName, rsh, e); 511 } finally { 512 try { 513 s.close(); 514 if (region != null && region.getCoprocessorHost() != null) { 515 region.getCoprocessorHost().postScannerClose(s); 516 } 517 } catch (IOException e) { 518 LOG.error("Closing scanner {} {}", this.scannerName, rsh, e); 519 } 520 } 521 } 522 } 523 524 private static ResultOrException getResultOrException(final ClientProtos.Result r, 525 final int index) { 526 return getResultOrException(ResponseConverter.buildActionResult(r), index); 527 } 528 529 private static ResultOrException getResultOrException(final Exception e, final int index) { 530 return getResultOrException(ResponseConverter.buildActionResult(e), index); 531 } 532 533 private static ResultOrException getResultOrException(final ResultOrException.Builder builder, 534 final int index) { 535 return builder.setIndex(index).build(); 536 } 537 538 /** 539 * Checks for the following pre-checks in order: 540 * <ol> 541 * <li>RegionServer is running</li> 542 * <li>If authorization is enabled, then RPC caller has ADMIN permissions</li> 543 * </ol> 544 * @param requestName name of rpc request. Used in reporting failures to provide context. 545 * @throws ServiceException If any of the above listed pre-check fails. 546 */ 547 private void rpcPreCheck(String requestName) throws ServiceException { 548 try { 549 checkOpen(); 550 requirePermission(requestName, Permission.Action.ADMIN); 551 } catch (IOException ioe) { 552 throw new ServiceException(ioe); 553 } 554 } 555 556 private boolean isClientCellBlockSupport(RpcCallContext context) { 557 return context != null && context.isClientCellBlockSupported(); 558 } 559 560 private void addResult(final MutateResponse.Builder builder, final Result result, 561 final HBaseRpcController rpcc, boolean clientCellBlockSupported) { 562 if (result == null) return; 563 if (clientCellBlockSupported) { 564 builder.setResult(ProtobufUtil.toResultNoData(result)); 565 rpcc.setCellScanner(result.cellScanner()); 566 } else { 567 ClientProtos.Result pbr = ProtobufUtil.toResult(result); 568 builder.setResult(pbr); 569 } 570 } 571 572 private void addResults(ScanResponse.Builder builder, List<Result> results, 573 HBaseRpcController controller, boolean isDefaultRegion, boolean clientCellBlockSupported) { 574 builder.setStale(!isDefaultRegion); 575 if (results.isEmpty()) { 576 return; 577 } 578 if (clientCellBlockSupported) { 579 for (Result res : results) { 580 builder.addCellsPerResult(res.size()); 581 builder.addPartialFlagPerResult(res.mayHaveMoreCellsInRow()); 582 } 583 controller.setCellScanner(CellUtil.createCellScanner(results)); 584 } else { 585 for (Result res : results) { 586 ClientProtos.Result pbr = ProtobufUtil.toResult(res); 587 builder.addResults(pbr); 588 } 589 } 590 } 591 592 private CheckAndMutateResult checkAndMutate(HRegion region, List<ClientProtos.Action> actions, 593 CellScanner cellScanner, Condition condition, long nonceGroup, 594 ActivePolicyEnforcement spaceQuotaEnforcement) throws IOException { 595 int countOfCompleteMutation = 0; 596 try { 597 if (!region.getRegionInfo().isMetaRegion()) { 598 server.getMemStoreFlusher().reclaimMemStoreMemory(); 599 } 600 List<Mutation> mutations = new ArrayList<>(); 601 long nonce = HConstants.NO_NONCE; 602 for (ClientProtos.Action action : actions) { 603 if (action.hasGet()) { 604 throw new DoNotRetryIOException( 605 "Atomic put and/or delete only, not a Get=" + action.getGet()); 606 } 607 MutationProto mutation = action.getMutation(); 608 MutationType type = mutation.getMutateType(); 609 switch (type) { 610 case PUT: 611 Put put = ProtobufUtil.toPut(mutation, cellScanner); 612 ++countOfCompleteMutation; 613 checkCellSizeLimit(region, put); 614 spaceQuotaEnforcement.getPolicyEnforcement(region).check(put); 615 mutations.add(put); 616 break; 617 case DELETE: 618 Delete del = ProtobufUtil.toDelete(mutation, cellScanner); 619 ++countOfCompleteMutation; 620 spaceQuotaEnforcement.getPolicyEnforcement(region).check(del); 621 mutations.add(del); 622 break; 623 case INCREMENT: 624 Increment increment = ProtobufUtil.toIncrement(mutation, cellScanner); 625 nonce = mutation.hasNonce() ? mutation.getNonce() : HConstants.NO_NONCE; 626 ++countOfCompleteMutation; 627 checkCellSizeLimit(region, increment); 628 spaceQuotaEnforcement.getPolicyEnforcement(region).check(increment); 629 mutations.add(increment); 630 break; 631 case APPEND: 632 Append append = ProtobufUtil.toAppend(mutation, cellScanner); 633 nonce = mutation.hasNonce() ? mutation.getNonce() : HConstants.NO_NONCE; 634 ++countOfCompleteMutation; 635 checkCellSizeLimit(region, append); 636 spaceQuotaEnforcement.getPolicyEnforcement(region).check(append); 637 mutations.add(append); 638 break; 639 default: 640 throw new DoNotRetryIOException("invalid mutation type : " + type); 641 } 642 } 643 644 if (mutations.size() == 0) { 645 return new CheckAndMutateResult(true, null); 646 } else { 647 CheckAndMutate checkAndMutate = ProtobufUtil.toCheckAndMutate(condition, mutations); 648 CheckAndMutateResult result = null; 649 if (region.getCoprocessorHost() != null) { 650 result = region.getCoprocessorHost().preCheckAndMutate(checkAndMutate); 651 } 652 if (result == null) { 653 result = region.checkAndMutate(checkAndMutate, nonceGroup, nonce); 654 if (region.getCoprocessorHost() != null) { 655 result = region.getCoprocessorHost().postCheckAndMutate(checkAndMutate, result); 656 } 657 } 658 return result; 659 } 660 } finally { 661 // Currently, the checkAndMutate isn't supported by batch so it won't mess up the cell scanner 662 // even if the malformed cells are not skipped. 663 for (int i = countOfCompleteMutation; i < actions.size(); ++i) { 664 skipCellsForMutation(actions.get(i), cellScanner); 665 } 666 } 667 } 668 669 /** 670 * Execute an append mutation. 671 * @return result to return to client if default operation should be bypassed as indicated by 672 * RegionObserver, null otherwise 673 */ 674 private Result append(final HRegion region, final OperationQuota quota, 675 final MutationProto mutation, final CellScanner cellScanner, long nonceGroup, 676 ActivePolicyEnforcement spaceQuota, RpcCallContext context) throws IOException { 677 long before = EnvironmentEdgeManager.currentTime(); 678 Append append = ProtobufUtil.toAppend(mutation, cellScanner); 679 checkCellSizeLimit(region, append); 680 spaceQuota.getPolicyEnforcement(region).check(append); 681 quota.addMutation(append); 682 long blockBytesScannedBefore = context != null ? context.getBlockBytesScanned() : 0; 683 long nonce = mutation.hasNonce() ? mutation.getNonce() : HConstants.NO_NONCE; 684 Result r = region.append(append, nonceGroup, nonce); 685 if (server.getMetrics() != null) { 686 long blockBytesScanned = 687 context != null ? context.getBlockBytesScanned() - blockBytesScannedBefore : 0; 688 server.getMetrics().updateAppend(region, EnvironmentEdgeManager.currentTime() - before, 689 blockBytesScanned); 690 } 691 return r == null ? Result.EMPTY_RESULT : r; 692 } 693 694 /** 695 * Execute an increment mutation. 696 */ 697 private Result increment(final HRegion region, final OperationQuota quota, 698 final MutationProto mutation, final CellScanner cells, long nonceGroup, 699 ActivePolicyEnforcement spaceQuota, RpcCallContext context) throws IOException { 700 long before = EnvironmentEdgeManager.currentTime(); 701 Increment increment = ProtobufUtil.toIncrement(mutation, cells); 702 checkCellSizeLimit(region, increment); 703 spaceQuota.getPolicyEnforcement(region).check(increment); 704 quota.addMutation(increment); 705 long blockBytesScannedBefore = context != null ? context.getBlockBytesScanned() : 0; 706 long nonce = mutation.hasNonce() ? mutation.getNonce() : HConstants.NO_NONCE; 707 Result r = region.increment(increment, nonceGroup, nonce); 708 final MetricsRegionServer metricsRegionServer = server.getMetrics(); 709 if (metricsRegionServer != null) { 710 long blockBytesScanned = 711 context != null ? context.getBlockBytesScanned() - blockBytesScannedBefore : 0; 712 metricsRegionServer.updateIncrement(region, EnvironmentEdgeManager.currentTime() - before, 713 blockBytesScanned); 714 } 715 return r == null ? Result.EMPTY_RESULT : r; 716 } 717 718 /** 719 * Run through the regionMutation <code>rm</code> and per Mutation, do the work, and then when 720 * done, add an instance of a {@link ResultOrException} that corresponds to each Mutation. 721 * @param cellsToReturn Could be null. May be allocated in this method. This is what this method 722 * returns as a 'result'. 723 * @param closeCallBack the callback to be used with multigets 724 * @param context the current RpcCallContext 725 * @return Return the <code>cellScanner</code> passed 726 */ 727 private List<CellScannable> doNonAtomicRegionMutation(final HRegion region, 728 final OperationQuota quota, final RegionAction actions, final CellScanner cellScanner, 729 final RegionActionResult.Builder builder, List<CellScannable> cellsToReturn, long nonceGroup, 730 final RegionScannersCloseCallBack closeCallBack, RpcCallContext context, 731 ActivePolicyEnforcement spaceQuotaEnforcement) { 732 // Gather up CONTIGUOUS Puts and Deletes in this mutations List. Idea is that rather than do 733 // one at a time, we instead pass them in batch. Be aware that the corresponding 734 // ResultOrException instance that matches each Put or Delete is then added down in the 735 // doNonAtomicBatchOp call. We should be staying aligned though the Put and Delete are 736 // deferred/batched 737 List<ClientProtos.Action> mutations = null; 738 long maxQuotaResultSize = Math.min(maxScannerResultSize, quota.getMaxResultSize()); 739 IOException sizeIOE = null; 740 ClientProtos.ResultOrException.Builder resultOrExceptionBuilder = 741 ResultOrException.newBuilder(); 742 boolean hasResultOrException = false; 743 for (ClientProtos.Action action : actions.getActionList()) { 744 hasResultOrException = false; 745 resultOrExceptionBuilder.clear(); 746 try { 747 Result r = null; 748 long blockBytesScannedBefore = context != null ? context.getBlockBytesScanned() : 0; 749 if ( 750 context != null && context.isRetryImmediatelySupported() 751 && (context.getResponseCellSize() > maxQuotaResultSize 752 || blockBytesScannedBefore + context.getResponseExceptionSize() > maxQuotaResultSize) 753 ) { 754 755 // We're storing the exception since the exception and reason string won't 756 // change after the response size limit is reached. 757 if (sizeIOE == null) { 758 // We don't need the stack un-winding do don't throw the exception. 759 // Throwing will kill the JVM's JIT. 760 // 761 // Instead just create the exception and then store it. 762 sizeIOE = new MultiActionResultTooLarge("Max size exceeded" + " CellSize: " 763 + context.getResponseCellSize() + " BlockSize: " + blockBytesScannedBefore); 764 765 // Only report the exception once since there's only one request that 766 // caused the exception. Otherwise this number will dominate the exceptions count. 767 rpcServer.getMetrics().exception(sizeIOE); 768 } 769 770 // Now that there's an exception is known to be created 771 // use it for the response. 772 // 773 // This will create a copy in the builder. 774 NameBytesPair pair = ResponseConverter.buildException(sizeIOE); 775 resultOrExceptionBuilder.setException(pair); 776 context.incrementResponseExceptionSize(pair.getSerializedSize()); 777 resultOrExceptionBuilder.setIndex(action.getIndex()); 778 builder.addResultOrException(resultOrExceptionBuilder.build()); 779 skipCellsForMutation(action, cellScanner); 780 continue; 781 } 782 if (action.hasGet()) { 783 long before = EnvironmentEdgeManager.currentTime(); 784 ClientProtos.Get pbGet = action.getGet(); 785 // An asynchbase client, https://github.com/OpenTSDB/asynchbase, starts by trying to do 786 // a get closest before. Throwing the UnknownProtocolException signals it that it needs 787 // to switch and do hbase2 protocol (HBase servers do not tell clients what versions 788 // they are; its a problem for non-native clients like asynchbase. HBASE-20225. 789 if (pbGet.hasClosestRowBefore() && pbGet.getClosestRowBefore()) { 790 throw new UnknownProtocolException("Is this a pre-hbase-1.0.0 or asynchbase client? " 791 + "Client is invoking getClosestRowBefore removed in hbase-2.0.0 replaced by " 792 + "reverse Scan."); 793 } 794 try { 795 Get get = ProtobufUtil.toGet(pbGet); 796 if (context != null) { 797 r = get(get, (region), closeCallBack, context); 798 } else { 799 r = region.get(get); 800 } 801 } finally { 802 final MetricsRegionServer metricsRegionServer = server.getMetrics(); 803 if (metricsRegionServer != null) { 804 long blockBytesScanned = 805 context != null ? context.getBlockBytesScanned() - blockBytesScannedBefore : 0; 806 metricsRegionServer.updateGet(region, EnvironmentEdgeManager.currentTime() - before, 807 blockBytesScanned); 808 } 809 } 810 } else if (action.hasServiceCall()) { 811 hasResultOrException = true; 812 Message result = execServiceOnRegion(region, action.getServiceCall()); 813 ClientProtos.CoprocessorServiceResult.Builder serviceResultBuilder = 814 ClientProtos.CoprocessorServiceResult.newBuilder(); 815 resultOrExceptionBuilder.setServiceResult(serviceResultBuilder 816 .setValue(serviceResultBuilder.getValueBuilder().setName(result.getClass().getName()) 817 // TODO: Copy!!! 818 .setValue(UnsafeByteOperations.unsafeWrap(result.toByteArray())))); 819 } else if (action.hasMutation()) { 820 MutationType type = action.getMutation().getMutateType(); 821 if ( 822 type != MutationType.PUT && type != MutationType.DELETE && mutations != null 823 && !mutations.isEmpty() 824 ) { 825 // Flush out any Puts or Deletes already collected. 826 doNonAtomicBatchOp(builder, region, quota, mutations, cellScanner, 827 spaceQuotaEnforcement); 828 mutations.clear(); 829 } 830 switch (type) { 831 case APPEND: 832 r = append(region, quota, action.getMutation(), cellScanner, nonceGroup, 833 spaceQuotaEnforcement, context); 834 break; 835 case INCREMENT: 836 r = increment(region, quota, action.getMutation(), cellScanner, nonceGroup, 837 spaceQuotaEnforcement, context); 838 break; 839 case PUT: 840 case DELETE: 841 // Collect the individual mutations and apply in a batch 842 if (mutations == null) { 843 mutations = new ArrayList<>(actions.getActionCount()); 844 } 845 mutations.add(action); 846 break; 847 default: 848 throw new DoNotRetryIOException("Unsupported mutate type: " + type.name()); 849 } 850 } else { 851 throw new HBaseIOException("Unexpected Action type"); 852 } 853 if (r != null) { 854 ClientProtos.Result pbResult = null; 855 if (isClientCellBlockSupport(context)) { 856 pbResult = ProtobufUtil.toResultNoData(r); 857 // Hard to guess the size here. Just make a rough guess. 858 if (cellsToReturn == null) { 859 cellsToReturn = new ArrayList<>(); 860 } 861 cellsToReturn.add(r); 862 } else { 863 pbResult = ProtobufUtil.toResult(r); 864 } 865 addSize(context, r); 866 hasResultOrException = true; 867 resultOrExceptionBuilder.setResult(pbResult); 868 } 869 // Could get to here and there was no result and no exception. Presumes we added 870 // a Put or Delete to the collecting Mutations List for adding later. In this 871 // case the corresponding ResultOrException instance for the Put or Delete will be added 872 // down in the doNonAtomicBatchOp method call rather than up here. 873 } catch (IOException ie) { 874 rpcServer.getMetrics().exception(ie); 875 hasResultOrException = true; 876 NameBytesPair pair = ResponseConverter.buildException(ie); 877 resultOrExceptionBuilder.setException(pair); 878 context.incrementResponseExceptionSize(pair.getSerializedSize()); 879 } 880 if (hasResultOrException) { 881 // Propagate index. 882 resultOrExceptionBuilder.setIndex(action.getIndex()); 883 builder.addResultOrException(resultOrExceptionBuilder.build()); 884 } 885 } 886 // Finish up any outstanding mutations 887 if (!CollectionUtils.isEmpty(mutations)) { 888 doNonAtomicBatchOp(builder, region, quota, mutations, cellScanner, spaceQuotaEnforcement); 889 } 890 return cellsToReturn; 891 } 892 893 private void checkCellSizeLimit(final HRegion r, final Mutation m) throws IOException { 894 if (r.maxCellSize > 0) { 895 CellScanner cells = m.cellScanner(); 896 while (cells.advance()) { 897 int size = PrivateCellUtil.estimatedSerializedSizeOf(cells.current()); 898 if (size > r.maxCellSize) { 899 String msg = "Cell[" + cells.current() + "] with size " + size + " exceeds limit of " 900 + r.maxCellSize + " bytes"; 901 LOG.debug(msg); 902 throw new DoNotRetryIOException(msg); 903 } 904 } 905 } 906 } 907 908 private void doAtomicBatchOp(final RegionActionResult.Builder builder, final HRegion region, 909 final OperationQuota quota, final List<ClientProtos.Action> mutations, final CellScanner cells, 910 long nonceGroup, ActivePolicyEnforcement spaceQuotaEnforcement) throws IOException { 911 // Just throw the exception. The exception will be caught and then added to region-level 912 // exception for RegionAction. Leaving the null to action result is ok since the null 913 // result is viewed as failure by hbase client. And the region-lever exception will be used 914 // to replaced the null result. see AsyncRequestFutureImpl#receiveMultiAction and 915 // AsyncBatchRpcRetryingCaller#onComplete for more details. 916 doBatchOp(builder, region, quota, mutations, cells, nonceGroup, spaceQuotaEnforcement, true); 917 } 918 919 private void doNonAtomicBatchOp(final RegionActionResult.Builder builder, final HRegion region, 920 final OperationQuota quota, final List<ClientProtos.Action> mutations, final CellScanner cells, 921 ActivePolicyEnforcement spaceQuotaEnforcement) { 922 try { 923 doBatchOp(builder, region, quota, mutations, cells, HConstants.NO_NONCE, 924 spaceQuotaEnforcement, false); 925 } catch (IOException e) { 926 // Set the exception for each action. The mutations in same RegionAction are group to 927 // different batch and then be processed individually. Hence, we don't set the region-level 928 // exception here for whole RegionAction. 929 for (Action mutation : mutations) { 930 builder.addResultOrException(getResultOrException(e, mutation.getIndex())); 931 } 932 } 933 } 934 935 /** 936 * Execute a list of mutations. 937 */ 938 private void doBatchOp(final RegionActionResult.Builder builder, final HRegion region, 939 final OperationQuota quota, final List<ClientProtos.Action> mutations, final CellScanner cells, 940 long nonceGroup, ActivePolicyEnforcement spaceQuotaEnforcement, boolean atomic) 941 throws IOException { 942 Mutation[] mArray = new Mutation[mutations.size()]; 943 long before = EnvironmentEdgeManager.currentTime(); 944 boolean batchContainsPuts = false, batchContainsDelete = false; 945 try { 946 /** 947 * HBASE-17924 mutationActionMap is a map to map the relation between mutations and actions 948 * since mutation array may have been reoredered.In order to return the right result or 949 * exception to the corresponding actions, We need to know which action is the mutation belong 950 * to. We can't sort ClientProtos.Action array, since they are bonded to cellscanners. 951 */ 952 Map<Mutation, ClientProtos.Action> mutationActionMap = new HashMap<>(); 953 int i = 0; 954 long nonce = HConstants.NO_NONCE; 955 for (ClientProtos.Action action : mutations) { 956 if (action.hasGet()) { 957 throw new DoNotRetryIOException( 958 "Atomic put and/or delete only, not a Get=" + action.getGet()); 959 } 960 MutationProto m = action.getMutation(); 961 Mutation mutation; 962 switch (m.getMutateType()) { 963 case PUT: 964 mutation = ProtobufUtil.toPut(m, cells); 965 batchContainsPuts = true; 966 break; 967 968 case DELETE: 969 mutation = ProtobufUtil.toDelete(m, cells); 970 batchContainsDelete = true; 971 break; 972 973 case INCREMENT: 974 mutation = ProtobufUtil.toIncrement(m, cells); 975 nonce = m.hasNonce() ? m.getNonce() : HConstants.NO_NONCE; 976 break; 977 978 case APPEND: 979 mutation = ProtobufUtil.toAppend(m, cells); 980 nonce = m.hasNonce() ? m.getNonce() : HConstants.NO_NONCE; 981 break; 982 983 default: 984 throw new DoNotRetryIOException("Invalid mutation type : " + m.getMutateType()); 985 } 986 mutationActionMap.put(mutation, action); 987 mArray[i++] = mutation; 988 checkCellSizeLimit(region, mutation); 989 // Check if a space quota disallows this mutation 990 spaceQuotaEnforcement.getPolicyEnforcement(region).check(mutation); 991 quota.addMutation(mutation); 992 } 993 994 if (!region.getRegionInfo().isMetaRegion()) { 995 server.getMemStoreFlusher().reclaimMemStoreMemory(); 996 } 997 998 // HBASE-17924 999 // Sort to improve lock efficiency for non-atomic batch of operations. If atomic 1000 // order is preserved as its expected from the client 1001 if (!atomic) { 1002 Arrays.sort(mArray, (v1, v2) -> Row.COMPARATOR.compare(v1, v2)); 1003 } 1004 1005 OperationStatus[] codes = region.batchMutate(mArray, atomic, nonceGroup, nonce); 1006 1007 // When atomic is true, it indicates that the mutateRow API or the batch API with 1008 // RowMutations is called. In this case, we need to merge the results of the 1009 // Increment/Append operations if the mutations include those operations, and set the merged 1010 // result to the first element of the ResultOrException list 1011 if (atomic) { 1012 List<ResultOrException> resultOrExceptions = new ArrayList<>(); 1013 List<Result> results = new ArrayList<>(); 1014 for (i = 0; i < codes.length; i++) { 1015 if (codes[i].getResult() != null) { 1016 results.add(codes[i].getResult()); 1017 } 1018 if (i != 0) { 1019 resultOrExceptions 1020 .add(getResultOrException(ClientProtos.Result.getDefaultInstance(), i)); 1021 } 1022 } 1023 1024 if (results.isEmpty()) { 1025 builder.addResultOrException( 1026 getResultOrException(ClientProtos.Result.getDefaultInstance(), 0)); 1027 } else { 1028 // Merge the results of the Increment/Append operations 1029 List<Cell> cellList = new ArrayList<>(); 1030 for (Result result : results) { 1031 if (result.rawCells() != null) { 1032 cellList.addAll(Arrays.asList(result.rawCells())); 1033 } 1034 } 1035 Result result = Result.create(cellList); 1036 1037 // Set the merged result of the Increment/Append operations to the first element of the 1038 // ResultOrException list 1039 builder.addResultOrException(getResultOrException(ProtobufUtil.toResult(result), 0)); 1040 } 1041 1042 builder.addAllResultOrException(resultOrExceptions); 1043 return; 1044 } 1045 1046 for (i = 0; i < codes.length; i++) { 1047 Mutation currentMutation = mArray[i]; 1048 ClientProtos.Action currentAction = mutationActionMap.get(currentMutation); 1049 int index = currentAction.hasIndex() ? currentAction.getIndex() : i; 1050 Exception e; 1051 switch (codes[i].getOperationStatusCode()) { 1052 case BAD_FAMILY: 1053 e = new NoSuchColumnFamilyException(codes[i].getExceptionMsg()); 1054 builder.addResultOrException(getResultOrException(e, index)); 1055 break; 1056 1057 case SANITY_CHECK_FAILURE: 1058 e = new FailedSanityCheckException(codes[i].getExceptionMsg()); 1059 builder.addResultOrException(getResultOrException(e, index)); 1060 break; 1061 1062 default: 1063 e = new DoNotRetryIOException(codes[i].getExceptionMsg()); 1064 builder.addResultOrException(getResultOrException(e, index)); 1065 break; 1066 1067 case SUCCESS: 1068 ClientProtos.Result result = codes[i].getResult() == null 1069 ? ClientProtos.Result.getDefaultInstance() 1070 : ProtobufUtil.toResult(codes[i].getResult()); 1071 builder.addResultOrException(getResultOrException(result, index)); 1072 break; 1073 1074 case STORE_TOO_BUSY: 1075 e = new RegionTooBusyException(codes[i].getExceptionMsg()); 1076 builder.addResultOrException(getResultOrException(e, index)); 1077 break; 1078 } 1079 } 1080 } finally { 1081 int processedMutationIndex = 0; 1082 for (Action mutation : mutations) { 1083 // The non-null mArray[i] means the cell scanner has been read. 1084 if (mArray[processedMutationIndex++] == null) { 1085 skipCellsForMutation(mutation, cells); 1086 } 1087 } 1088 updateMutationMetrics(region, before, batchContainsPuts, batchContainsDelete); 1089 } 1090 } 1091 1092 private void updateMutationMetrics(HRegion region, long starttime, boolean batchContainsPuts, 1093 boolean batchContainsDelete) { 1094 final MetricsRegionServer metricsRegionServer = server.getMetrics(); 1095 if (metricsRegionServer != null) { 1096 long after = EnvironmentEdgeManager.currentTime(); 1097 if (batchContainsPuts) { 1098 metricsRegionServer.updatePutBatch(region, after - starttime); 1099 } 1100 if (batchContainsDelete) { 1101 metricsRegionServer.updateDeleteBatch(region, after - starttime); 1102 } 1103 } 1104 } 1105 1106 /** 1107 * Execute a list of Put/Delete mutations. The function returns OperationStatus instead of 1108 * constructing MultiResponse to save a possible loop if caller doesn't need MultiResponse. 1109 * @return an array of OperationStatus which internally contains the OperationStatusCode and the 1110 * exceptionMessage if any 1111 * @deprecated Since 3.0.0, will be removed in 4.0.0. We do not use this method for replaying 1112 * edits for secondary replicas any more, see 1113 * {@link #replicateToReplica(RpcController, ReplicateWALEntryRequest)}. 1114 */ 1115 @Deprecated 1116 private OperationStatus[] doReplayBatchOp(final HRegion region, 1117 final List<MutationReplay> mutations, long replaySeqId) throws IOException { 1118 long before = EnvironmentEdgeManager.currentTime(); 1119 boolean batchContainsPuts = false, batchContainsDelete = false; 1120 try { 1121 for (Iterator<MutationReplay> it = mutations.iterator(); it.hasNext();) { 1122 MutationReplay m = it.next(); 1123 1124 if (m.getType() == MutationType.PUT) { 1125 batchContainsPuts = true; 1126 } else { 1127 batchContainsDelete = true; 1128 } 1129 1130 NavigableMap<byte[], List<Cell>> map = m.mutation.getFamilyCellMap(); 1131 List<Cell> metaCells = map.get(WALEdit.METAFAMILY); 1132 if (metaCells != null && !metaCells.isEmpty()) { 1133 for (Cell metaCell : metaCells) { 1134 CompactionDescriptor compactionDesc = WALEdit.getCompaction(metaCell); 1135 boolean isDefaultReplica = RegionReplicaUtil.isDefaultReplica(region.getRegionInfo()); 1136 HRegion hRegion = region; 1137 if (compactionDesc != null) { 1138 // replay the compaction. Remove the files from stores only if we are the primary 1139 // region replica (thus own the files) 1140 hRegion.replayWALCompactionMarker(compactionDesc, !isDefaultReplica, isDefaultReplica, 1141 replaySeqId); 1142 continue; 1143 } 1144 FlushDescriptor flushDesc = WALEdit.getFlushDescriptor(metaCell); 1145 if (flushDesc != null && !isDefaultReplica) { 1146 hRegion.replayWALFlushMarker(flushDesc, replaySeqId); 1147 continue; 1148 } 1149 RegionEventDescriptor regionEvent = WALEdit.getRegionEventDescriptor(metaCell); 1150 if (regionEvent != null && !isDefaultReplica) { 1151 hRegion.replayWALRegionEventMarker(regionEvent); 1152 continue; 1153 } 1154 BulkLoadDescriptor bulkLoadEvent = WALEdit.getBulkLoadDescriptor(metaCell); 1155 if (bulkLoadEvent != null) { 1156 hRegion.replayWALBulkLoadEventMarker(bulkLoadEvent); 1157 continue; 1158 } 1159 } 1160 it.remove(); 1161 } 1162 } 1163 requestCount.increment(); 1164 if (!region.getRegionInfo().isMetaRegion()) { 1165 server.getMemStoreFlusher().reclaimMemStoreMemory(); 1166 } 1167 return region.batchReplay(mutations.toArray(new MutationReplay[mutations.size()]), 1168 replaySeqId); 1169 } finally { 1170 updateMutationMetrics(region, before, batchContainsPuts, batchContainsDelete); 1171 } 1172 } 1173 1174 private void closeAllScanners() { 1175 // Close any outstanding scanners. Means they'll get an UnknownScanner 1176 // exception next time they come in. 1177 for (Map.Entry<String, RegionScannerHolder> e : scanners.entrySet()) { 1178 try { 1179 e.getValue().s.close(); 1180 } catch (IOException ioe) { 1181 LOG.warn("Closing scanner " + e.getKey(), ioe); 1182 } 1183 } 1184 } 1185 1186 // Directly invoked only for testing 1187 public RSRpcServices(final HRegionServer rs) throws IOException { 1188 super(rs, rs.getProcessName()); 1189 final Configuration conf = rs.getConfiguration(); 1190 setReloadableGuardrails(conf); 1191 scannerLeaseTimeoutPeriod = conf.getInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, 1192 HConstants.DEFAULT_HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD); 1193 rpcTimeout = 1194 conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY, HConstants.DEFAULT_HBASE_RPC_TIMEOUT); 1195 minimumScanTimeLimitDelta = conf.getLong(REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA, 1196 DEFAULT_REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA); 1197 rpcServer.setNamedQueueRecorder(rs.getNamedQueueRecorder()); 1198 closedScanners = CacheBuilder.newBuilder() 1199 .expireAfterAccess(scannerLeaseTimeoutPeriod, TimeUnit.MILLISECONDS).build(); 1200 } 1201 1202 @Override 1203 protected boolean defaultReservoirEnabled() { 1204 return true; 1205 } 1206 1207 @Override 1208 protected ServerType getDNSServerType() { 1209 return DNS.ServerType.REGIONSERVER; 1210 } 1211 1212 @Override 1213 protected String getHostname(Configuration conf, String defaultHostname) { 1214 return conf.get("hbase.regionserver.ipc.address", defaultHostname); 1215 } 1216 1217 @Override 1218 protected String getPortConfigName() { 1219 return HConstants.REGIONSERVER_PORT; 1220 } 1221 1222 @Override 1223 protected int getDefaultPort() { 1224 return HConstants.DEFAULT_REGIONSERVER_PORT; 1225 } 1226 1227 @Override 1228 protected Class<?> getRpcSchedulerFactoryClass(Configuration conf) { 1229 return conf.getClass(REGION_SERVER_RPC_SCHEDULER_FACTORY_CLASS, 1230 SimpleRpcSchedulerFactory.class); 1231 } 1232 1233 protected RpcServerInterface createRpcServer(final Server server, 1234 final RpcSchedulerFactory rpcSchedulerFactory, final InetSocketAddress bindAddress, 1235 final String name) throws IOException { 1236 final Configuration conf = server.getConfiguration(); 1237 boolean reservoirEnabled = conf.getBoolean(ByteBuffAllocator.ALLOCATOR_POOL_ENABLED_KEY, true); 1238 try { 1239 return RpcServerFactory.createRpcServer(server, name, getServices(), bindAddress, // use final 1240 // bindAddress 1241 // for this 1242 // server. 1243 conf, rpcSchedulerFactory.create(conf, this, server), reservoirEnabled); 1244 } catch (BindException be) { 1245 throw new IOException(be.getMessage() + ". To switch ports use the '" 1246 + HConstants.REGIONSERVER_PORT + "' configuration property.", 1247 be.getCause() != null ? be.getCause() : be); 1248 } 1249 } 1250 1251 protected Class<?> getRpcSchedulerFactoryClass() { 1252 final Configuration conf = server.getConfiguration(); 1253 return conf.getClass(REGION_SERVER_RPC_SCHEDULER_FACTORY_CLASS, 1254 SimpleRpcSchedulerFactory.class); 1255 } 1256 1257 protected PriorityFunction createPriority() { 1258 return new RSAnnotationReadingPriorityFunction(this); 1259 } 1260 1261 public int getScannersCount() { 1262 return scanners.size(); 1263 } 1264 1265 /** Returns The outstanding RegionScanner for <code>scannerId</code> or null if none found. */ 1266 RegionScanner getScanner(long scannerId) { 1267 RegionScannerHolder rsh = getRegionScannerHolder(scannerId); 1268 return rsh == null ? null : rsh.s; 1269 } 1270 1271 /** Returns The associated RegionScannerHolder for <code>scannerId</code> or null. */ 1272 private RegionScannerHolder getRegionScannerHolder(long scannerId) { 1273 return scanners.get(toScannerName(scannerId)); 1274 } 1275 1276 public String getScanDetailsWithId(long scannerId) { 1277 RegionScanner scanner = getScanner(scannerId); 1278 if (scanner == null) { 1279 return null; 1280 } 1281 StringBuilder builder = new StringBuilder(); 1282 builder.append("table: ").append(scanner.getRegionInfo().getTable().getNameAsString()); 1283 builder.append(" region: ").append(scanner.getRegionInfo().getRegionNameAsString()); 1284 builder.append(" operation_id: ").append(scanner.getOperationId()); 1285 return builder.toString(); 1286 } 1287 1288 public String getScanDetailsWithRequest(ScanRequest request) { 1289 try { 1290 if (!request.hasRegion()) { 1291 return null; 1292 } 1293 Region region = getRegion(request.getRegion()); 1294 StringBuilder builder = new StringBuilder(); 1295 builder.append("table: ").append(region.getRegionInfo().getTable().getNameAsString()); 1296 builder.append(" region: ").append(region.getRegionInfo().getRegionNameAsString()); 1297 for (NameBytesPair pair : request.getScan().getAttributeList()) { 1298 if (OperationWithAttributes.ID_ATRIBUTE.equals(pair.getName())) { 1299 builder.append(" operation_id: ").append(Bytes.toString(pair.getValue().toByteArray())); 1300 break; 1301 } 1302 } 1303 return builder.toString(); 1304 } catch (IOException ignored) { 1305 return null; 1306 } 1307 } 1308 1309 /** 1310 * Get the vtime associated with the scanner. Currently the vtime is the number of "next" calls. 1311 */ 1312 long getScannerVirtualTime(long scannerId) { 1313 RegionScannerHolder rsh = getRegionScannerHolder(scannerId); 1314 return rsh == null ? 0L : rsh.getNextCallSeq(); 1315 } 1316 1317 /** 1318 * Method to account for the size of retained cells. 1319 * @param context rpc call context 1320 * @param r result to add size. 1321 * @return an object that represents the last referenced block from this response. 1322 */ 1323 void addSize(RpcCallContext context, Result r) { 1324 if (context != null && r != null && !r.isEmpty()) { 1325 for (Cell c : r.rawCells()) { 1326 context.incrementResponseCellSize(PrivateCellUtil.estimatedSerializedSizeOf(c)); 1327 } 1328 } 1329 } 1330 1331 /** Returns Remote client's ip and port else null if can't be determined. */ 1332 @RestrictedApi(explanation = "Should only be called in TestRSRpcServices and RSRpcServices", 1333 link = "", allowedOnPath = ".*(TestRSRpcServices|RSRpcServices).java") 1334 static String getRemoteClientIpAndPort() { 1335 RpcCall rpcCall = RpcServer.getCurrentCall().orElse(null); 1336 if (rpcCall == null) { 1337 return HConstants.EMPTY_STRING; 1338 } 1339 InetAddress address = rpcCall.getRemoteAddress(); 1340 if (address == null) { 1341 return HConstants.EMPTY_STRING; 1342 } 1343 // Be careful here with InetAddress. Do InetAddress#getHostAddress. It will not do a name 1344 // resolution. Just use the IP. It is generally a smaller amount of info to keep around while 1345 // scanning than a hostname anyways. 1346 return Address.fromParts(address.getHostAddress(), rpcCall.getRemotePort()).toString(); 1347 } 1348 1349 /** Returns Remote client's username. */ 1350 @RestrictedApi(explanation = "Should only be called in TestRSRpcServices and RSRpcServices", 1351 link = "", allowedOnPath = ".*(TestRSRpcServices|RSRpcServices).java") 1352 static String getUserName() { 1353 RpcCall rpcCall = RpcServer.getCurrentCall().orElse(null); 1354 if (rpcCall == null) { 1355 return HConstants.EMPTY_STRING; 1356 } 1357 return rpcCall.getRequestUserName().orElse(HConstants.EMPTY_STRING); 1358 } 1359 1360 private RegionScannerHolder addScanner(String scannerName, RegionScanner s, Shipper shipper, 1361 HRegion r, boolean needCursor, boolean fullRegionScan) throws LeaseStillHeldException { 1362 Lease lease = server.getLeaseManager().createLease(scannerName, this.scannerLeaseTimeoutPeriod, 1363 new ScannerListener(scannerName)); 1364 RpcCallback shippedCallback = new RegionScannerShippedCallBack(scannerName, shipper, lease); 1365 RpcCallback closeCallback = 1366 s instanceof RpcCallback ? (RpcCallback) s : new RegionScannerCloseCallBack(s); 1367 RegionScannerHolder rsh = new RegionScannerHolder(s, r, closeCallback, shippedCallback, 1368 needCursor, fullRegionScan, getRemoteClientIpAndPort(), getUserName()); 1369 RegionScannerHolder existing = scanners.putIfAbsent(scannerName, rsh); 1370 assert existing == null : "scannerId must be unique within regionserver's whole lifecycle! " 1371 + scannerName + ", " + existing; 1372 return rsh; 1373 } 1374 1375 private boolean isFullRegionScan(Scan scan, HRegion region) { 1376 // If the scan start row equals or less than the start key of the region 1377 // and stop row greater than equals end key (if stop row present) 1378 // or if the stop row is empty 1379 // account this as a full region scan 1380 if ( 1381 Bytes.compareTo(scan.getStartRow(), region.getRegionInfo().getStartKey()) <= 0 1382 && (Bytes.compareTo(scan.getStopRow(), region.getRegionInfo().getEndKey()) >= 0 1383 && !Bytes.equals(region.getRegionInfo().getEndKey(), HConstants.EMPTY_END_ROW) 1384 || Bytes.equals(scan.getStopRow(), HConstants.EMPTY_END_ROW)) 1385 ) { 1386 return true; 1387 } 1388 return false; 1389 } 1390 1391 /** 1392 * Find the HRegion based on a region specifier 1393 * @param regionSpecifier the region specifier 1394 * @return the corresponding region 1395 * @throws IOException if the specifier is not null, but failed to find the region 1396 */ 1397 public HRegion getRegion(final RegionSpecifier regionSpecifier) throws IOException { 1398 return server.getRegion(regionSpecifier.getValue().toByteArray()); 1399 } 1400 1401 /** 1402 * Find the List of HRegions based on a list of region specifiers 1403 * @param regionSpecifiers the list of region specifiers 1404 * @return the corresponding list of regions 1405 * @throws IOException if any of the specifiers is not null, but failed to find the region 1406 */ 1407 private List<HRegion> getRegions(final List<RegionSpecifier> regionSpecifiers, 1408 final CacheEvictionStatsBuilder stats) { 1409 List<HRegion> regions = Lists.newArrayListWithCapacity(regionSpecifiers.size()); 1410 for (RegionSpecifier regionSpecifier : regionSpecifiers) { 1411 try { 1412 regions.add(server.getRegion(regionSpecifier.getValue().toByteArray())); 1413 } catch (NotServingRegionException e) { 1414 stats.addException(regionSpecifier.getValue().toByteArray(), e); 1415 } 1416 } 1417 return regions; 1418 } 1419 1420 PriorityFunction getPriority() { 1421 return priority; 1422 } 1423 1424 private RegionServerRpcQuotaManager getRpcQuotaManager() { 1425 return server.getRegionServerRpcQuotaManager(); 1426 } 1427 1428 private RegionServerSpaceQuotaManager getSpaceQuotaManager() { 1429 return server.getRegionServerSpaceQuotaManager(); 1430 } 1431 1432 void start(ZKWatcher zkWatcher) { 1433 this.scannerIdGenerator = new ScannerIdGenerator(this.server.getServerName()); 1434 internalStart(zkWatcher); 1435 } 1436 1437 void stop() { 1438 closeAllScanners(); 1439 internalStop(); 1440 } 1441 1442 /** 1443 * Called to verify that this server is up and running. 1444 */ 1445 // TODO : Rename this and HMaster#checkInitialized to isRunning() (or a better name). 1446 protected void checkOpen() throws IOException { 1447 if (server.isAborted()) { 1448 throw new RegionServerAbortedException("Server " + server.getServerName() + " aborting"); 1449 } 1450 if (server.isStopped()) { 1451 throw new RegionServerStoppedException("Server " + server.getServerName() + " stopping"); 1452 } 1453 if (!server.isDataFileSystemOk()) { 1454 throw new RegionServerStoppedException("File system not available"); 1455 } 1456 if (!server.isOnline()) { 1457 throw new ServerNotRunningYetException( 1458 "Server " + server.getServerName() + " is not running yet"); 1459 } 1460 } 1461 1462 /** 1463 * By default, put up an Admin and a Client Service. Set booleans 1464 * <code>hbase.regionserver.admin.executorService</code> and 1465 * <code>hbase.regionserver.client.executorService</code> if you want to enable/disable services. 1466 * Default is that both are enabled. 1467 * @return immutable list of blocking services and the security info classes that this server 1468 * supports 1469 */ 1470 protected List<BlockingServiceAndInterface> getServices() { 1471 boolean admin = getConfiguration().getBoolean(REGIONSERVER_ADMIN_SERVICE_CONFIG, true); 1472 boolean client = getConfiguration().getBoolean(REGIONSERVER_CLIENT_SERVICE_CONFIG, true); 1473 boolean clientMeta = 1474 getConfiguration().getBoolean(REGIONSERVER_CLIENT_META_SERVICE_CONFIG, true); 1475 boolean bootstrapNodes = 1476 getConfiguration().getBoolean(REGIONSERVER_BOOTSTRAP_NODES_SERVICE_CONFIG, true); 1477 List<BlockingServiceAndInterface> bssi = new ArrayList<>(); 1478 if (client) { 1479 bssi.add(new BlockingServiceAndInterface(ClientService.newReflectiveBlockingService(this), 1480 ClientService.BlockingInterface.class)); 1481 } 1482 if (admin) { 1483 bssi.add(new BlockingServiceAndInterface(AdminService.newReflectiveBlockingService(this), 1484 AdminService.BlockingInterface.class)); 1485 } 1486 if (clientMeta) { 1487 bssi.add(new BlockingServiceAndInterface(ClientMetaService.newReflectiveBlockingService(this), 1488 ClientMetaService.BlockingInterface.class)); 1489 } 1490 if (bootstrapNodes) { 1491 bssi.add( 1492 new BlockingServiceAndInterface(BootstrapNodeService.newReflectiveBlockingService(this), 1493 BootstrapNodeService.BlockingInterface.class)); 1494 } 1495 return new ImmutableList.Builder<BlockingServiceAndInterface>().addAll(bssi).build(); 1496 } 1497 1498 /** 1499 * Close a region on the region server. 1500 * @param controller the RPC controller 1501 * @param request the request 1502 */ 1503 @Override 1504 @QosPriority(priority = HConstants.ADMIN_QOS) 1505 public CloseRegionResponse closeRegion(final RpcController controller, 1506 final CloseRegionRequest request) throws ServiceException { 1507 final ServerName sn = (request.hasDestinationServer() 1508 ? ProtobufUtil.toServerName(request.getDestinationServer()) 1509 : null); 1510 1511 try { 1512 checkOpen(); 1513 throwOnWrongStartCode(request); 1514 final String encodedRegionName = ProtobufUtil.getRegionEncodedName(request.getRegion()); 1515 1516 requestCount.increment(); 1517 if (sn == null) { 1518 LOG.info("Close " + encodedRegionName + " without moving"); 1519 } else { 1520 LOG.info("Close " + encodedRegionName + ", moving to " + sn); 1521 } 1522 boolean closed = server.closeRegion(encodedRegionName, false, sn); 1523 CloseRegionResponse.Builder builder = CloseRegionResponse.newBuilder().setClosed(closed); 1524 return builder.build(); 1525 } catch (IOException ie) { 1526 throw new ServiceException(ie); 1527 } 1528 } 1529 1530 /** 1531 * Compact a region on the region server. 1532 * @param controller the RPC controller 1533 * @param request the request 1534 */ 1535 @Override 1536 @QosPriority(priority = HConstants.ADMIN_QOS) 1537 public CompactRegionResponse compactRegion(final RpcController controller, 1538 final CompactRegionRequest request) throws ServiceException { 1539 try { 1540 checkOpen(); 1541 requestCount.increment(); 1542 HRegion region = getRegion(request.getRegion()); 1543 // Quota support is enabled, the requesting user is not system/super user 1544 // and a quota policy is enforced that disables compactions. 1545 if ( 1546 QuotaUtil.isQuotaEnabled(getConfiguration()) 1547 && !Superusers.isSuperUser(RpcServer.getRequestUser().orElse(null)) 1548 && this.server.getRegionServerSpaceQuotaManager() 1549 .areCompactionsDisabled(region.getTableDescriptor().getTableName()) 1550 ) { 1551 throw new DoNotRetryIOException( 1552 "Compactions on this region are " + "disabled due to a space quota violation."); 1553 } 1554 region.startRegionOperation(Operation.COMPACT_REGION); 1555 LOG.info("Compacting " + region.getRegionInfo().getRegionNameAsString()); 1556 boolean major = request.hasMajor() && request.getMajor(); 1557 if (request.hasFamily()) { 1558 byte[] family = request.getFamily().toByteArray(); 1559 String log = "User-triggered " + (major ? "major " : "") + "compaction for region " 1560 + region.getRegionInfo().getRegionNameAsString() + " and family " 1561 + Bytes.toString(family); 1562 LOG.trace(log); 1563 region.requestCompaction(family, log, Store.PRIORITY_USER, major, 1564 CompactionLifeCycleTracker.DUMMY); 1565 } else { 1566 String log = "User-triggered " + (major ? "major " : "") + "compaction for region " 1567 + region.getRegionInfo().getRegionNameAsString(); 1568 LOG.trace(log); 1569 region.requestCompaction(log, Store.PRIORITY_USER, major, CompactionLifeCycleTracker.DUMMY); 1570 } 1571 return CompactRegionResponse.newBuilder().build(); 1572 } catch (IOException ie) { 1573 throw new ServiceException(ie); 1574 } 1575 } 1576 1577 @Override 1578 @QosPriority(priority = HConstants.ADMIN_QOS) 1579 public CompactionSwitchResponse compactionSwitch(RpcController controller, 1580 CompactionSwitchRequest request) throws ServiceException { 1581 rpcPreCheck("compactionSwitch"); 1582 final CompactSplit compactSplitThread = server.getCompactSplitThread(); 1583 requestCount.increment(); 1584 boolean prevState = compactSplitThread.isCompactionsEnabled(); 1585 CompactionSwitchResponse response = 1586 CompactionSwitchResponse.newBuilder().setPrevState(prevState).build(); 1587 if (prevState == request.getEnabled()) { 1588 // passed in requested state is same as current state. No action required 1589 return response; 1590 } 1591 compactSplitThread.switchCompaction(request.getEnabled()); 1592 return response; 1593 } 1594 1595 /** 1596 * Flush a region on the region server. 1597 * @param controller the RPC controller 1598 * @param request the request 1599 */ 1600 @Override 1601 @QosPriority(priority = HConstants.ADMIN_QOS) 1602 public FlushRegionResponse flushRegion(final RpcController controller, 1603 final FlushRegionRequest request) throws ServiceException { 1604 try { 1605 checkOpen(); 1606 requestCount.increment(); 1607 HRegion region = getRegion(request.getRegion()); 1608 LOG.info("Flushing " + region.getRegionInfo().getRegionNameAsString()); 1609 boolean shouldFlush = true; 1610 if (request.hasIfOlderThanTs()) { 1611 shouldFlush = region.getEarliestFlushTimeForAllStores() < request.getIfOlderThanTs(); 1612 } 1613 FlushRegionResponse.Builder builder = FlushRegionResponse.newBuilder(); 1614 if (shouldFlush) { 1615 boolean writeFlushWalMarker = 1616 request.hasWriteFlushWalMarker() ? request.getWriteFlushWalMarker() : false; 1617 // Go behind the curtain so we can manage writing of the flush WAL marker 1618 HRegion.FlushResultImpl flushResult = null; 1619 if (request.hasFamily()) { 1620 List families = new ArrayList(); 1621 families.add(request.getFamily().toByteArray()); 1622 flushResult = 1623 region.flushcache(families, writeFlushWalMarker, FlushLifeCycleTracker.DUMMY); 1624 } else { 1625 flushResult = region.flushcache(true, writeFlushWalMarker, FlushLifeCycleTracker.DUMMY); 1626 } 1627 boolean compactionNeeded = flushResult.isCompactionNeeded(); 1628 if (compactionNeeded) { 1629 server.getCompactSplitThread().requestSystemCompaction(region, 1630 "Compaction through user triggered flush"); 1631 } 1632 builder.setFlushed(flushResult.isFlushSucceeded()); 1633 builder.setWroteFlushWalMarker(flushResult.wroteFlushWalMarker); 1634 } 1635 builder.setLastFlushTime(region.getEarliestFlushTimeForAllStores()); 1636 return builder.build(); 1637 } catch (DroppedSnapshotException ex) { 1638 // Cache flush can fail in a few places. If it fails in a critical 1639 // section, we get a DroppedSnapshotException and a replay of wal 1640 // is required. Currently the only way to do this is a restart of 1641 // the server. 1642 server.abort("Replay of WAL required. Forcing server shutdown", ex); 1643 throw new ServiceException(ex); 1644 } catch (IOException ie) { 1645 throw new ServiceException(ie); 1646 } 1647 } 1648 1649 @Override 1650 @QosPriority(priority = HConstants.ADMIN_QOS) 1651 public GetOnlineRegionResponse getOnlineRegion(final RpcController controller, 1652 final GetOnlineRegionRequest request) throws ServiceException { 1653 try { 1654 checkOpen(); 1655 requestCount.increment(); 1656 Map<String, HRegion> onlineRegions = server.getOnlineRegions(); 1657 List<RegionInfo> list = new ArrayList<>(onlineRegions.size()); 1658 for (HRegion region : onlineRegions.values()) { 1659 list.add(region.getRegionInfo()); 1660 } 1661 list.sort(RegionInfo.COMPARATOR); 1662 return ResponseConverter.buildGetOnlineRegionResponse(list); 1663 } catch (IOException ie) { 1664 throw new ServiceException(ie); 1665 } 1666 } 1667 1668 // Master implementation of this Admin Service differs given it is not 1669 // able to supply detail only known to RegionServer. See note on 1670 // MasterRpcServers#getRegionInfo. 1671 @Override 1672 @QosPriority(priority = HConstants.ADMIN_QOS) 1673 public GetRegionInfoResponse getRegionInfo(final RpcController controller, 1674 final GetRegionInfoRequest request) throws ServiceException { 1675 try { 1676 checkOpen(); 1677 requestCount.increment(); 1678 HRegion region = getRegion(request.getRegion()); 1679 RegionInfo info = region.getRegionInfo(); 1680 byte[] bestSplitRow; 1681 if (request.hasBestSplitRow() && request.getBestSplitRow()) { 1682 bestSplitRow = region.checkSplit(true).orElse(null); 1683 // when all table data are in memstore, bestSplitRow = null 1684 // try to flush region first 1685 if (bestSplitRow == null) { 1686 region.flush(true); 1687 bestSplitRow = region.checkSplit(true).orElse(null); 1688 } 1689 } else { 1690 bestSplitRow = null; 1691 } 1692 GetRegionInfoResponse.Builder builder = GetRegionInfoResponse.newBuilder(); 1693 builder.setRegionInfo(ProtobufUtil.toRegionInfo(info)); 1694 if (request.hasCompactionState() && request.getCompactionState()) { 1695 builder.setCompactionState(ProtobufUtil.createCompactionState(region.getCompactionState())); 1696 } 1697 builder.setSplittable(region.isSplittable()); 1698 builder.setMergeable(region.isMergeable()); 1699 if (request.hasBestSplitRow() && request.getBestSplitRow() && bestSplitRow != null) { 1700 builder.setBestSplitRow(UnsafeByteOperations.unsafeWrap(bestSplitRow)); 1701 } 1702 return builder.build(); 1703 } catch (IOException ie) { 1704 throw new ServiceException(ie); 1705 } 1706 } 1707 1708 @Override 1709 @QosPriority(priority = HConstants.ADMIN_QOS) 1710 public GetRegionLoadResponse getRegionLoad(RpcController controller, GetRegionLoadRequest request) 1711 throws ServiceException { 1712 1713 List<HRegion> regions; 1714 if (request.hasTableName()) { 1715 TableName tableName = ProtobufUtil.toTableName(request.getTableName()); 1716 regions = server.getRegions(tableName); 1717 } else { 1718 regions = server.getRegions(); 1719 } 1720 List<RegionLoad> rLoads = new ArrayList<>(regions.size()); 1721 RegionLoad.Builder regionLoadBuilder = ClusterStatusProtos.RegionLoad.newBuilder(); 1722 RegionSpecifier.Builder regionSpecifier = RegionSpecifier.newBuilder(); 1723 1724 try { 1725 for (HRegion region : regions) { 1726 rLoads.add(server.createRegionLoad(region, regionLoadBuilder, regionSpecifier)); 1727 } 1728 } catch (IOException e) { 1729 throw new ServiceException(e); 1730 } 1731 GetRegionLoadResponse.Builder builder = GetRegionLoadResponse.newBuilder(); 1732 builder.addAllRegionLoads(rLoads); 1733 return builder.build(); 1734 } 1735 1736 @Override 1737 @QosPriority(priority = HConstants.ADMIN_QOS) 1738 public ClearCompactionQueuesResponse clearCompactionQueues(RpcController controller, 1739 ClearCompactionQueuesRequest request) throws ServiceException { 1740 LOG.debug("Client=" + RpcServer.getRequestUserName().orElse(null) + "/" 1741 + RpcServer.getRemoteAddress().orElse(null) + " clear compactions queue"); 1742 ClearCompactionQueuesResponse.Builder respBuilder = ClearCompactionQueuesResponse.newBuilder(); 1743 requestCount.increment(); 1744 if (clearCompactionQueues.compareAndSet(false, true)) { 1745 final CompactSplit compactSplitThread = server.getCompactSplitThread(); 1746 try { 1747 checkOpen(); 1748 server.getRegionServerCoprocessorHost().preClearCompactionQueues(); 1749 for (String queueName : request.getQueueNameList()) { 1750 LOG.debug("clear " + queueName + " compaction queue"); 1751 switch (queueName) { 1752 case "long": 1753 compactSplitThread.clearLongCompactionsQueue(); 1754 break; 1755 case "short": 1756 compactSplitThread.clearShortCompactionsQueue(); 1757 break; 1758 default: 1759 LOG.warn("Unknown queue name " + queueName); 1760 throw new IOException("Unknown queue name " + queueName); 1761 } 1762 } 1763 server.getRegionServerCoprocessorHost().postClearCompactionQueues(); 1764 } catch (IOException ie) { 1765 throw new ServiceException(ie); 1766 } finally { 1767 clearCompactionQueues.set(false); 1768 } 1769 } else { 1770 LOG.warn("Clear compactions queue is executing by other admin."); 1771 } 1772 return respBuilder.build(); 1773 } 1774 1775 /** 1776 * Get some information of the region server. 1777 * @param controller the RPC controller 1778 * @param request the request 1779 */ 1780 @Override 1781 @QosPriority(priority = HConstants.ADMIN_QOS) 1782 public GetServerInfoResponse getServerInfo(final RpcController controller, 1783 final GetServerInfoRequest request) throws ServiceException { 1784 try { 1785 checkOpen(); 1786 } catch (IOException ie) { 1787 throw new ServiceException(ie); 1788 } 1789 requestCount.increment(); 1790 int infoPort = server.getInfoServer() != null ? server.getInfoServer().getPort() : -1; 1791 return ResponseConverter.buildGetServerInfoResponse(server.getServerName(), infoPort); 1792 } 1793 1794 @Override 1795 @QosPriority(priority = HConstants.ADMIN_QOS) 1796 public GetStoreFileResponse getStoreFile(final RpcController controller, 1797 final GetStoreFileRequest request) throws ServiceException { 1798 try { 1799 checkOpen(); 1800 HRegion region = getRegion(request.getRegion()); 1801 requestCount.increment(); 1802 Set<byte[]> columnFamilies; 1803 if (request.getFamilyCount() == 0) { 1804 columnFamilies = region.getTableDescriptor().getColumnFamilyNames(); 1805 } else { 1806 columnFamilies = new TreeSet<>(Bytes.BYTES_RAWCOMPARATOR); 1807 for (ByteString cf : request.getFamilyList()) { 1808 columnFamilies.add(cf.toByteArray()); 1809 } 1810 } 1811 int nCF = columnFamilies.size(); 1812 List<String> fileList = region.getStoreFileList(columnFamilies.toArray(new byte[nCF][])); 1813 GetStoreFileResponse.Builder builder = GetStoreFileResponse.newBuilder(); 1814 builder.addAllStoreFile(fileList); 1815 return builder.build(); 1816 } catch (IOException ie) { 1817 throw new ServiceException(ie); 1818 } 1819 } 1820 1821 private void throwOnWrongStartCode(OpenRegionRequest request) throws ServiceException { 1822 if (!request.hasServerStartCode()) { 1823 LOG.warn("OpenRegionRequest for {} does not have a start code", request.getOpenInfoList()); 1824 return; 1825 } 1826 throwOnWrongStartCode(request.getServerStartCode()); 1827 } 1828 1829 private void throwOnWrongStartCode(CloseRegionRequest request) throws ServiceException { 1830 if (!request.hasServerStartCode()) { 1831 LOG.warn("CloseRegionRequest for {} does not have a start code", request.getRegion()); 1832 return; 1833 } 1834 throwOnWrongStartCode(request.getServerStartCode()); 1835 } 1836 1837 private void throwOnWrongStartCode(long serverStartCode) throws ServiceException { 1838 // check that we are the same server that this RPC is intended for. 1839 if (server.getServerName().getStartcode() != serverStartCode) { 1840 throw new ServiceException(new DoNotRetryIOException( 1841 "This RPC was intended for a " + "different server with startCode: " + serverStartCode 1842 + ", this server is: " + server.getServerName())); 1843 } 1844 } 1845 1846 private void throwOnWrongStartCode(ExecuteProceduresRequest req) throws ServiceException { 1847 if (req.getOpenRegionCount() > 0) { 1848 for (OpenRegionRequest openReq : req.getOpenRegionList()) { 1849 throwOnWrongStartCode(openReq); 1850 } 1851 } 1852 if (req.getCloseRegionCount() > 0) { 1853 for (CloseRegionRequest closeReq : req.getCloseRegionList()) { 1854 throwOnWrongStartCode(closeReq); 1855 } 1856 } 1857 } 1858 1859 /** 1860 * Open asynchronously a region or a set of regions on the region server. The opening is 1861 * coordinated by ZooKeeper, and this method requires the znode to be created before being called. 1862 * As a consequence, this method should be called only from the master. 1863 * <p> 1864 * Different manages states for the region are: 1865 * </p> 1866 * <ul> 1867 * <li>region not opened: the region opening will start asynchronously.</li> 1868 * <li>a close is already in progress: this is considered as an error.</li> 1869 * <li>an open is already in progress: this new open request will be ignored. This is important 1870 * because the Master can do multiple requests if it crashes.</li> 1871 * <li>the region is already opened: this new open request will be ignored.</li> 1872 * </ul> 1873 * <p> 1874 * Bulk assign: If there are more than 1 region to open, it will be considered as a bulk assign. 1875 * For a single region opening, errors are sent through a ServiceException. For bulk assign, 1876 * errors are put in the response as FAILED_OPENING. 1877 * </p> 1878 * @param controller the RPC controller 1879 * @param request the request 1880 */ 1881 @Override 1882 @QosPriority(priority = HConstants.ADMIN_QOS) 1883 public OpenRegionResponse openRegion(final RpcController controller, 1884 final OpenRegionRequest request) throws ServiceException { 1885 requestCount.increment(); 1886 throwOnWrongStartCode(request); 1887 1888 OpenRegionResponse.Builder builder = OpenRegionResponse.newBuilder(); 1889 final int regionCount = request.getOpenInfoCount(); 1890 final Map<TableName, TableDescriptor> htds = new HashMap<>(regionCount); 1891 final boolean isBulkAssign = regionCount > 1; 1892 try { 1893 checkOpen(); 1894 } catch (IOException ie) { 1895 TableName tableName = null; 1896 if (regionCount == 1) { 1897 org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionInfo ri = 1898 request.getOpenInfo(0).getRegion(); 1899 if (ri != null) { 1900 tableName = ProtobufUtil.toTableName(ri.getTableName()); 1901 } 1902 } 1903 if (!TableName.META_TABLE_NAME.equals(tableName)) { 1904 throw new ServiceException(ie); 1905 } 1906 // We are assigning meta, wait a little for regionserver to finish initialization. 1907 // Default to quarter of RPC timeout 1908 int timeout = server.getConfiguration().getInt(HConstants.HBASE_RPC_TIMEOUT_KEY, 1909 HConstants.DEFAULT_HBASE_RPC_TIMEOUT) >> 2; 1910 long endTime = EnvironmentEdgeManager.currentTime() + timeout; 1911 synchronized (server.online) { 1912 try { 1913 while ( 1914 EnvironmentEdgeManager.currentTime() <= endTime && !server.isStopped() 1915 && !server.isOnline() 1916 ) { 1917 server.online.wait(server.getMsgInterval()); 1918 } 1919 checkOpen(); 1920 } catch (InterruptedException t) { 1921 Thread.currentThread().interrupt(); 1922 throw new ServiceException(t); 1923 } catch (IOException e) { 1924 throw new ServiceException(e); 1925 } 1926 } 1927 } 1928 1929 long masterSystemTime = request.hasMasterSystemTime() ? request.getMasterSystemTime() : -1; 1930 1931 for (RegionOpenInfo regionOpenInfo : request.getOpenInfoList()) { 1932 final RegionInfo region = ProtobufUtil.toRegionInfo(regionOpenInfo.getRegion()); 1933 TableDescriptor htd; 1934 try { 1935 String encodedName = region.getEncodedName(); 1936 byte[] encodedNameBytes = region.getEncodedNameAsBytes(); 1937 final HRegion onlineRegion = server.getRegion(encodedName); 1938 if (onlineRegion != null) { 1939 // The region is already online. This should not happen any more. 1940 String error = "Received OPEN for the region:" + region.getRegionNameAsString() 1941 + ", which is already online"; 1942 LOG.warn(error); 1943 // server.abort(error); 1944 // throw new IOException(error); 1945 builder.addOpeningState(RegionOpeningState.OPENED); 1946 continue; 1947 } 1948 LOG.info("Open " + region.getRegionNameAsString()); 1949 1950 final Boolean previous = 1951 server.getRegionsInTransitionInRS().putIfAbsent(encodedNameBytes, Boolean.TRUE); 1952 1953 if (Boolean.FALSE.equals(previous)) { 1954 if (server.getRegion(encodedName) != null) { 1955 // There is a close in progress. This should not happen any more. 1956 String error = "Received OPEN for the region:" + region.getRegionNameAsString() 1957 + ", which we are already trying to CLOSE"; 1958 server.abort(error); 1959 throw new IOException(error); 1960 } 1961 server.getRegionsInTransitionInRS().put(encodedNameBytes, Boolean.TRUE); 1962 } 1963 1964 if (Boolean.TRUE.equals(previous)) { 1965 // An open is in progress. This is supported, but let's log this. 1966 LOG.info("Receiving OPEN for the region:" + region.getRegionNameAsString() 1967 + ", which we are already trying to OPEN" 1968 + " - ignoring this new request for this region."); 1969 } 1970 1971 // We are opening this region. If it moves back and forth for whatever reason, we don't 1972 // want to keep returning the stale moved record while we are opening/if we close again. 1973 server.removeFromMovedRegions(region.getEncodedName()); 1974 1975 if (previous == null || !previous.booleanValue()) { 1976 htd = htds.get(region.getTable()); 1977 if (htd == null) { 1978 htd = server.getTableDescriptors().get(region.getTable()); 1979 htds.put(region.getTable(), htd); 1980 } 1981 if (htd == null) { 1982 throw new IOException("Missing table descriptor for " + region.getEncodedName()); 1983 } 1984 // If there is no action in progress, we can submit a specific handler. 1985 // Need to pass the expected version in the constructor. 1986 if (server.getExecutorService() == null) { 1987 LOG.info("No executor executorService; skipping open request"); 1988 } else { 1989 if (region.isMetaRegion()) { 1990 server.getExecutorService() 1991 .submit(new OpenMetaHandler(server, server, region, htd, masterSystemTime)); 1992 } else { 1993 if (regionOpenInfo.getFavoredNodesCount() > 0) { 1994 server.updateRegionFavoredNodesMapping(region.getEncodedName(), 1995 regionOpenInfo.getFavoredNodesList()); 1996 } 1997 if (htd.getPriority() >= HConstants.ADMIN_QOS || region.getTable().isSystemTable()) { 1998 server.getExecutorService().submit( 1999 new OpenPriorityRegionHandler(server, server, region, htd, masterSystemTime)); 2000 } else { 2001 server.getExecutorService() 2002 .submit(new OpenRegionHandler(server, server, region, htd, masterSystemTime)); 2003 } 2004 } 2005 } 2006 } 2007 2008 builder.addOpeningState(RegionOpeningState.OPENED); 2009 } catch (IOException ie) { 2010 LOG.warn("Failed opening region " + region.getRegionNameAsString(), ie); 2011 if (isBulkAssign) { 2012 builder.addOpeningState(RegionOpeningState.FAILED_OPENING); 2013 } else { 2014 throw new ServiceException(ie); 2015 } 2016 } 2017 } 2018 return builder.build(); 2019 } 2020 2021 /** 2022 * Warmup a region on this server. This method should only be called by Master. It synchronously 2023 * opens the region and closes the region bringing the most important pages in cache. 2024 */ 2025 @Override 2026 public WarmupRegionResponse warmupRegion(final RpcController controller, 2027 final WarmupRegionRequest request) throws ServiceException { 2028 final RegionInfo region = ProtobufUtil.toRegionInfo(request.getRegionInfo()); 2029 WarmupRegionResponse response = WarmupRegionResponse.getDefaultInstance(); 2030 try { 2031 checkOpen(); 2032 String encodedName = region.getEncodedName(); 2033 byte[] encodedNameBytes = region.getEncodedNameAsBytes(); 2034 final HRegion onlineRegion = server.getRegion(encodedName); 2035 if (onlineRegion != null) { 2036 LOG.info("{} is online; skipping warmup", region); 2037 return response; 2038 } 2039 TableDescriptor htd = server.getTableDescriptors().get(region.getTable()); 2040 if (server.getRegionsInTransitionInRS().containsKey(encodedNameBytes)) { 2041 LOG.info("{} is in transition; skipping warmup", region); 2042 return response; 2043 } 2044 LOG.info("Warmup {}", region.getRegionNameAsString()); 2045 HRegion.warmupHRegion(region, htd, server.getWAL(region), server.getConfiguration(), server, 2046 null); 2047 } catch (IOException ie) { 2048 LOG.error("Failed warmup of {}", region.getRegionNameAsString(), ie); 2049 throw new ServiceException(ie); 2050 } 2051 2052 return response; 2053 } 2054 2055 private CellScanner getAndReset(RpcController controller) { 2056 HBaseRpcController hrc = (HBaseRpcController) controller; 2057 CellScanner cells = hrc.cellScanner(); 2058 hrc.setCellScanner(null); 2059 return cells; 2060 } 2061 2062 /** 2063 * Replay the given changes when distributedLogReplay WAL edits from a failed RS. The guarantee is 2064 * that the given mutations will be durable on the receiving RS if this method returns without any 2065 * exception. 2066 * @param controller the RPC controller 2067 * @param request the request 2068 * @deprecated Since 3.0.0, will be removed in 4.0.0. Not used any more, put here only for 2069 * compatibility with old region replica implementation. Now we will use 2070 * {@code replicateToReplica} method instead. 2071 */ 2072 @Deprecated 2073 @Override 2074 @QosPriority(priority = HConstants.REPLAY_QOS) 2075 public ReplicateWALEntryResponse replay(final RpcController controller, 2076 final ReplicateWALEntryRequest request) throws ServiceException { 2077 long before = EnvironmentEdgeManager.currentTime(); 2078 CellScanner cells = getAndReset(controller); 2079 try { 2080 checkOpen(); 2081 List<WALEntry> entries = request.getEntryList(); 2082 if (entries == null || entries.isEmpty()) { 2083 // empty input 2084 return ReplicateWALEntryResponse.newBuilder().build(); 2085 } 2086 ByteString regionName = entries.get(0).getKey().getEncodedRegionName(); 2087 HRegion region = server.getRegionByEncodedName(regionName.toStringUtf8()); 2088 RegionCoprocessorHost coprocessorHost = 2089 ServerRegionReplicaUtil.isDefaultReplica(region.getRegionInfo()) 2090 ? region.getCoprocessorHost() 2091 : null; // do not invoke coprocessors if this is a secondary region replica 2092 List<Pair<WALKey, WALEdit>> walEntries = new ArrayList<>(); 2093 2094 // Skip adding the edits to WAL if this is a secondary region replica 2095 boolean isPrimary = RegionReplicaUtil.isDefaultReplica(region.getRegionInfo()); 2096 Durability durability = isPrimary ? Durability.USE_DEFAULT : Durability.SKIP_WAL; 2097 2098 for (WALEntry entry : entries) { 2099 if (!regionName.equals(entry.getKey().getEncodedRegionName())) { 2100 throw new NotServingRegionException("Replay request contains entries from multiple " 2101 + "regions. First region:" + regionName.toStringUtf8() + " , other region:" 2102 + entry.getKey().getEncodedRegionName()); 2103 } 2104 if (server.nonceManager != null && isPrimary) { 2105 long nonceGroup = 2106 entry.getKey().hasNonceGroup() ? entry.getKey().getNonceGroup() : HConstants.NO_NONCE; 2107 long nonce = entry.getKey().hasNonce() ? entry.getKey().getNonce() : HConstants.NO_NONCE; 2108 server.nonceManager.reportOperationFromWal(nonceGroup, nonce, 2109 entry.getKey().getWriteTime()); 2110 } 2111 Pair<WALKey, WALEdit> walEntry = (coprocessorHost == null) ? null : new Pair<>(); 2112 List<MutationReplay> edits = 2113 WALSplitUtil.getMutationsFromWALEntry(entry, cells, walEntry, durability); 2114 if (coprocessorHost != null) { 2115 // Start coprocessor replay here. The coprocessor is for each WALEdit instead of a 2116 // KeyValue. 2117 if ( 2118 coprocessorHost.preWALRestore(region.getRegionInfo(), walEntry.getFirst(), 2119 walEntry.getSecond()) 2120 ) { 2121 // if bypass this log entry, ignore it ... 2122 continue; 2123 } 2124 walEntries.add(walEntry); 2125 } 2126 if (edits != null && !edits.isEmpty()) { 2127 // HBASE-17924 2128 // sort to improve lock efficiency 2129 Collections.sort(edits, (v1, v2) -> Row.COMPARATOR.compare(v1.mutation, v2.mutation)); 2130 long replaySeqId = (entry.getKey().hasOrigSequenceNumber()) 2131 ? entry.getKey().getOrigSequenceNumber() 2132 : entry.getKey().getLogSequenceNumber(); 2133 OperationStatus[] result = doReplayBatchOp(region, edits, replaySeqId); 2134 // check if it's a partial success 2135 for (int i = 0; result != null && i < result.length; i++) { 2136 if (result[i] != OperationStatus.SUCCESS) { 2137 throw new IOException(result[i].getExceptionMsg()); 2138 } 2139 } 2140 } 2141 } 2142 2143 // sync wal at the end because ASYNC_WAL is used above 2144 WAL wal = region.getWAL(); 2145 if (wal != null) { 2146 wal.sync(); 2147 } 2148 2149 if (coprocessorHost != null) { 2150 for (Pair<WALKey, WALEdit> entry : walEntries) { 2151 coprocessorHost.postWALRestore(region.getRegionInfo(), entry.getFirst(), 2152 entry.getSecond()); 2153 } 2154 } 2155 return ReplicateWALEntryResponse.newBuilder().build(); 2156 } catch (IOException ie) { 2157 throw new ServiceException(ie); 2158 } finally { 2159 final MetricsRegionServer metricsRegionServer = server.getMetrics(); 2160 if (metricsRegionServer != null) { 2161 metricsRegionServer.updateReplay(EnvironmentEdgeManager.currentTime() - before); 2162 } 2163 } 2164 } 2165 2166 /** 2167 * Replay the given changes on a secondary replica 2168 */ 2169 @Override 2170 public ReplicateWALEntryResponse replicateToReplica(RpcController controller, 2171 ReplicateWALEntryRequest request) throws ServiceException { 2172 CellScanner cells = getAndReset(controller); 2173 try { 2174 checkOpen(); 2175 List<WALEntry> entries = request.getEntryList(); 2176 if (entries == null || entries.isEmpty()) { 2177 // empty input 2178 return ReplicateWALEntryResponse.newBuilder().build(); 2179 } 2180 ByteString regionName = entries.get(0).getKey().getEncodedRegionName(); 2181 HRegion region = server.getRegionByEncodedName(regionName.toStringUtf8()); 2182 if (RegionReplicaUtil.isDefaultReplica(region.getRegionInfo())) { 2183 throw new DoNotRetryIOException( 2184 "Should not replicate to primary replica " + region.getRegionInfo() + ", CODE BUG?"); 2185 } 2186 for (WALEntry entry : entries) { 2187 if (!regionName.equals(entry.getKey().getEncodedRegionName())) { 2188 throw new NotServingRegionException( 2189 "ReplicateToReplica request contains entries from multiple " + "regions. First region:" 2190 + regionName.toStringUtf8() + " , other region:" 2191 + entry.getKey().getEncodedRegionName()); 2192 } 2193 region.replayWALEntry(entry, cells); 2194 } 2195 return ReplicateWALEntryResponse.newBuilder().build(); 2196 } catch (IOException ie) { 2197 throw new ServiceException(ie); 2198 } 2199 } 2200 2201 private void checkShouldRejectReplicationRequest(List<WALEntry> entries) throws IOException { 2202 ReplicationSourceService replicationSource = server.getReplicationSourceService(); 2203 if (replicationSource == null || entries.isEmpty()) { 2204 return; 2205 } 2206 // We can ensure that all entries are for one peer, so only need to check one entry's 2207 // table name. if the table hit sync replication at peer side and the peer cluster 2208 // is (or is transiting to) state ACTIVE or DOWNGRADE_ACTIVE, we should reject to apply 2209 // those entries according to the design doc. 2210 TableName table = TableName.valueOf(entries.get(0).getKey().getTableName().toByteArray()); 2211 if ( 2212 replicationSource.getSyncReplicationPeerInfoProvider().checkState(table, 2213 RejectReplicationRequestStateChecker.get()) 2214 ) { 2215 throw new DoNotRetryIOException( 2216 "Reject to apply to sink cluster because sync replication state of sink cluster " 2217 + "is ACTIVE or DOWNGRADE_ACTIVE, table: " + table); 2218 } 2219 } 2220 2221 /** 2222 * Replicate WAL entries on the region server. 2223 * @param controller the RPC controller 2224 * @param request the request 2225 */ 2226 @Override 2227 @QosPriority(priority = HConstants.REPLICATION_QOS) 2228 public ReplicateWALEntryResponse replicateWALEntry(final RpcController controller, 2229 final ReplicateWALEntryRequest request) throws ServiceException { 2230 try { 2231 checkOpen(); 2232 if (server.getReplicationSinkService() != null) { 2233 requestCount.increment(); 2234 List<WALEntry> entries = request.getEntryList(); 2235 checkShouldRejectReplicationRequest(entries); 2236 CellScanner cellScanner = getAndReset(controller); 2237 server.getRegionServerCoprocessorHost().preReplicateLogEntries(); 2238 server.getReplicationSinkService().replicateLogEntries(entries, cellScanner, 2239 request.getReplicationClusterId(), request.getSourceBaseNamespaceDirPath(), 2240 request.getSourceHFileArchiveDirPath()); 2241 server.getRegionServerCoprocessorHost().postReplicateLogEntries(); 2242 return ReplicateWALEntryResponse.newBuilder().build(); 2243 } else { 2244 throw new ServiceException("Replication services are not initialized yet"); 2245 } 2246 } catch (IOException ie) { 2247 throw new ServiceException(ie); 2248 } 2249 } 2250 2251 /** 2252 * Roll the WAL writer of the region server. 2253 * @param controller the RPC controller 2254 * @param request the request 2255 */ 2256 @Override 2257 @QosPriority(priority = HConstants.ADMIN_QOS) 2258 public RollWALWriterResponse rollWALWriter(final RpcController controller, 2259 final RollWALWriterRequest request) throws ServiceException { 2260 try { 2261 checkOpen(); 2262 requestCount.increment(); 2263 server.getRegionServerCoprocessorHost().preRollWALWriterRequest(); 2264 server.getWalRoller().requestRollAll(); 2265 server.getRegionServerCoprocessorHost().postRollWALWriterRequest(); 2266 RollWALWriterResponse.Builder builder = RollWALWriterResponse.newBuilder(); 2267 return builder.build(); 2268 } catch (IOException ie) { 2269 throw new ServiceException(ie); 2270 } 2271 } 2272 2273 /** 2274 * Stop the region server. 2275 * @param controller the RPC controller 2276 * @param request the request 2277 */ 2278 @Override 2279 @QosPriority(priority = HConstants.ADMIN_QOS) 2280 public StopServerResponse stopServer(final RpcController controller, 2281 final StopServerRequest request) throws ServiceException { 2282 rpcPreCheck("stopServer"); 2283 requestCount.increment(); 2284 String reason = request.getReason(); 2285 server.stop(reason); 2286 return StopServerResponse.newBuilder().build(); 2287 } 2288 2289 @Override 2290 public UpdateFavoredNodesResponse updateFavoredNodes(RpcController controller, 2291 UpdateFavoredNodesRequest request) throws ServiceException { 2292 rpcPreCheck("updateFavoredNodes"); 2293 List<UpdateFavoredNodesRequest.RegionUpdateInfo> openInfoList = request.getUpdateInfoList(); 2294 UpdateFavoredNodesResponse.Builder respBuilder = UpdateFavoredNodesResponse.newBuilder(); 2295 for (UpdateFavoredNodesRequest.RegionUpdateInfo regionUpdateInfo : openInfoList) { 2296 RegionInfo hri = ProtobufUtil.toRegionInfo(regionUpdateInfo.getRegion()); 2297 if (regionUpdateInfo.getFavoredNodesCount() > 0) { 2298 server.updateRegionFavoredNodesMapping(hri.getEncodedName(), 2299 regionUpdateInfo.getFavoredNodesList()); 2300 } 2301 } 2302 respBuilder.setResponse(openInfoList.size()); 2303 return respBuilder.build(); 2304 } 2305 2306 /** 2307 * Atomically bulk load several HFiles into an open region 2308 * @return true if successful, false is failed but recoverably (no action) 2309 * @throws ServiceException if failed unrecoverably 2310 */ 2311 @Override 2312 public BulkLoadHFileResponse bulkLoadHFile(final RpcController controller, 2313 final BulkLoadHFileRequest request) throws ServiceException { 2314 long start = EnvironmentEdgeManager.currentTime(); 2315 List<String> clusterIds = new ArrayList<>(request.getClusterIdsList()); 2316 if (clusterIds.contains(this.server.getClusterId())) { 2317 return BulkLoadHFileResponse.newBuilder().setLoaded(true).build(); 2318 } else { 2319 clusterIds.add(this.server.getClusterId()); 2320 } 2321 try { 2322 checkOpen(); 2323 requestCount.increment(); 2324 HRegion region = getRegion(request.getRegion()); 2325 final boolean spaceQuotaEnabled = QuotaUtil.isQuotaEnabled(getConfiguration()); 2326 long sizeToBeLoaded = -1; 2327 2328 // Check to see if this bulk load would exceed the space quota for this table 2329 if (spaceQuotaEnabled) { 2330 ActivePolicyEnforcement activeSpaceQuotas = getSpaceQuotaManager().getActiveEnforcements(); 2331 SpaceViolationPolicyEnforcement enforcement = 2332 activeSpaceQuotas.getPolicyEnforcement(region); 2333 if (enforcement != null) { 2334 // Bulk loads must still be atomic. We must enact all or none. 2335 List<String> filePaths = new ArrayList<>(request.getFamilyPathCount()); 2336 for (FamilyPath familyPath : request.getFamilyPathList()) { 2337 filePaths.add(familyPath.getPath()); 2338 } 2339 // Check if the batch of files exceeds the current quota 2340 sizeToBeLoaded = enforcement.computeBulkLoadSize(getFileSystem(filePaths), filePaths); 2341 } 2342 } 2343 // secure bulk load 2344 Map<byte[], List<Path>> map = 2345 server.getSecureBulkLoadManager().secureBulkLoadHFiles(region, request, clusterIds); 2346 BulkLoadHFileResponse.Builder builder = BulkLoadHFileResponse.newBuilder(); 2347 builder.setLoaded(map != null); 2348 if (map != null) { 2349 // Treat any negative size as a flag to "ignore" updating the region size as that is 2350 // not possible to occur in real life (cannot bulk load a file with negative size) 2351 if (spaceQuotaEnabled && sizeToBeLoaded > 0) { 2352 if (LOG.isTraceEnabled()) { 2353 LOG.trace("Incrementing space use of " + region.getRegionInfo() + " by " 2354 + sizeToBeLoaded + " bytes"); 2355 } 2356 // Inform space quotas of the new files for this region 2357 getSpaceQuotaManager().getRegionSizeStore().incrementRegionSize(region.getRegionInfo(), 2358 sizeToBeLoaded); 2359 } 2360 } 2361 return builder.build(); 2362 } catch (IOException ie) { 2363 throw new ServiceException(ie); 2364 } finally { 2365 final MetricsRegionServer metricsRegionServer = server.getMetrics(); 2366 if (metricsRegionServer != null) { 2367 metricsRegionServer.updateBulkLoad(EnvironmentEdgeManager.currentTime() - start); 2368 } 2369 } 2370 } 2371 2372 @Override 2373 public PrepareBulkLoadResponse prepareBulkLoad(RpcController controller, 2374 PrepareBulkLoadRequest request) throws ServiceException { 2375 try { 2376 checkOpen(); 2377 requestCount.increment(); 2378 2379 HRegion region = getRegion(request.getRegion()); 2380 2381 String bulkToken = server.getSecureBulkLoadManager().prepareBulkLoad(region, request); 2382 PrepareBulkLoadResponse.Builder builder = PrepareBulkLoadResponse.newBuilder(); 2383 builder.setBulkToken(bulkToken); 2384 return builder.build(); 2385 } catch (IOException ie) { 2386 throw new ServiceException(ie); 2387 } 2388 } 2389 2390 @Override 2391 public CleanupBulkLoadResponse cleanupBulkLoad(RpcController controller, 2392 CleanupBulkLoadRequest request) throws ServiceException { 2393 try { 2394 checkOpen(); 2395 requestCount.increment(); 2396 2397 HRegion region = getRegion(request.getRegion()); 2398 2399 server.getSecureBulkLoadManager().cleanupBulkLoad(region, request); 2400 return CleanupBulkLoadResponse.newBuilder().build(); 2401 } catch (IOException ie) { 2402 throw new ServiceException(ie); 2403 } 2404 } 2405 2406 @Override 2407 public CoprocessorServiceResponse execService(final RpcController controller, 2408 final CoprocessorServiceRequest request) throws ServiceException { 2409 try { 2410 checkOpen(); 2411 requestCount.increment(); 2412 HRegion region = getRegion(request.getRegion()); 2413 Message result = execServiceOnRegion(region, request.getCall()); 2414 CoprocessorServiceResponse.Builder builder = CoprocessorServiceResponse.newBuilder(); 2415 builder.setRegion(RequestConverter.buildRegionSpecifier(RegionSpecifierType.REGION_NAME, 2416 region.getRegionInfo().getRegionName())); 2417 // TODO: COPIES!!!!!! 2418 builder.setValue(builder.getValueBuilder().setName(result.getClass().getName()).setValue( 2419 org.apache.hbase.thirdparty.com.google.protobuf.ByteString.copyFrom(result.toByteArray()))); 2420 return builder.build(); 2421 } catch (IOException ie) { 2422 throw new ServiceException(ie); 2423 } 2424 } 2425 2426 private FileSystem getFileSystem(List<String> filePaths) throws IOException { 2427 if (filePaths.isEmpty()) { 2428 // local hdfs 2429 return server.getFileSystem(); 2430 } 2431 // source hdfs 2432 return new Path(filePaths.get(0)).getFileSystem(server.getConfiguration()); 2433 } 2434 2435 private Message execServiceOnRegion(HRegion region, 2436 final ClientProtos.CoprocessorServiceCall serviceCall) throws IOException { 2437 // ignore the passed in controller (from the serialized call) 2438 ServerRpcController execController = new ServerRpcController(); 2439 return region.execService(execController, serviceCall); 2440 } 2441 2442 private boolean shouldRejectRequestsFromClient(HRegion region) { 2443 TableName table = region.getRegionInfo().getTable(); 2444 ReplicationSourceService service = server.getReplicationSourceService(); 2445 return service != null && service.getSyncReplicationPeerInfoProvider().checkState(table, 2446 RejectRequestsFromClientStateChecker.get()); 2447 } 2448 2449 private void rejectIfInStandByState(HRegion region) throws DoNotRetryIOException { 2450 if (shouldRejectRequestsFromClient(region)) { 2451 throw new DoNotRetryIOException( 2452 region.getRegionInfo().getRegionNameAsString() + " is in STANDBY state."); 2453 } 2454 } 2455 2456 /** 2457 * Get data from a table. 2458 * @param controller the RPC controller 2459 * @param request the get request 2460 */ 2461 @Override 2462 public GetResponse get(final RpcController controller, final GetRequest request) 2463 throws ServiceException { 2464 long before = EnvironmentEdgeManager.currentTime(); 2465 OperationQuota quota = null; 2466 HRegion region = null; 2467 RpcCallContext context = RpcServer.getCurrentCall().orElse(null); 2468 try { 2469 checkOpen(); 2470 requestCount.increment(); 2471 rpcGetRequestCount.increment(); 2472 region = getRegion(request.getRegion()); 2473 rejectIfInStandByState(region); 2474 2475 GetResponse.Builder builder = GetResponse.newBuilder(); 2476 ClientProtos.Get get = request.getGet(); 2477 // An asynchbase client, https://github.com/OpenTSDB/asynchbase, starts by trying to do 2478 // a get closest before. Throwing the UnknownProtocolException signals it that it needs 2479 // to switch and do hbase2 protocol (HBase servers do not tell clients what versions 2480 // they are; its a problem for non-native clients like asynchbase. HBASE-20225. 2481 if (get.hasClosestRowBefore() && get.getClosestRowBefore()) { 2482 throw new UnknownProtocolException("Is this a pre-hbase-1.0.0 or asynchbase client? " 2483 + "Client is invoking getClosestRowBefore removed in hbase-2.0.0 replaced by " 2484 + "reverse Scan."); 2485 } 2486 Boolean existence = null; 2487 Result r = null; 2488 quota = getRpcQuotaManager().checkBatchQuota(region, OperationQuota.OperationType.GET); 2489 2490 Get clientGet = ProtobufUtil.toGet(get); 2491 if (get.getExistenceOnly() && region.getCoprocessorHost() != null) { 2492 existence = region.getCoprocessorHost().preExists(clientGet); 2493 } 2494 if (existence == null) { 2495 if (context != null) { 2496 r = get(clientGet, (region), null, context); 2497 } else { 2498 // for test purpose 2499 r = region.get(clientGet); 2500 } 2501 if (get.getExistenceOnly()) { 2502 boolean exists = r.getExists(); 2503 if (region.getCoprocessorHost() != null) { 2504 exists = region.getCoprocessorHost().postExists(clientGet, exists); 2505 } 2506 existence = exists; 2507 } 2508 } 2509 if (existence != null) { 2510 ClientProtos.Result pbr = 2511 ProtobufUtil.toResult(existence, region.getRegionInfo().getReplicaId() != 0); 2512 builder.setResult(pbr); 2513 } else if (r != null) { 2514 ClientProtos.Result pbr; 2515 if ( 2516 isClientCellBlockSupport(context) && controller instanceof HBaseRpcController 2517 && VersionInfoUtil.hasMinimumVersion(context.getClientVersionInfo(), 1, 3) 2518 ) { 2519 pbr = ProtobufUtil.toResultNoData(r); 2520 ((HBaseRpcController) controller) 2521 .setCellScanner(CellUtil.createCellScanner(r.rawCells())); 2522 addSize(context, r); 2523 } else { 2524 pbr = ProtobufUtil.toResult(r); 2525 } 2526 builder.setResult(pbr); 2527 } 2528 // r.cells is null when an table.exists(get) call 2529 if (r != null && r.rawCells() != null) { 2530 quota.addGetResult(r); 2531 } 2532 return builder.build(); 2533 } catch (IOException ie) { 2534 throw new ServiceException(ie); 2535 } finally { 2536 final MetricsRegionServer metricsRegionServer = server.getMetrics(); 2537 if (metricsRegionServer != null && region != null) { 2538 long blockBytesScanned = context != null ? context.getBlockBytesScanned() : 0; 2539 metricsRegionServer.updateGet(region, EnvironmentEdgeManager.currentTime() - before, 2540 blockBytesScanned); 2541 } 2542 if (quota != null) { 2543 quota.close(); 2544 } 2545 } 2546 } 2547 2548 private Result get(Get get, HRegion region, RegionScannersCloseCallBack closeCallBack, 2549 RpcCallContext context) throws IOException { 2550 region.prepareGet(get); 2551 boolean stale = region.getRegionInfo().getReplicaId() != 0; 2552 2553 // This method is almost the same as HRegion#get. 2554 List<Cell> results = new ArrayList<>(); 2555 // pre-get CP hook 2556 if (region.getCoprocessorHost() != null) { 2557 if (region.getCoprocessorHost().preGet(get, results)) { 2558 region.metricsUpdateForGet(); 2559 return Result.create(results, get.isCheckExistenceOnly() ? !results.isEmpty() : null, 2560 stale); 2561 } 2562 } 2563 Scan scan = new Scan(get); 2564 if (scan.getLoadColumnFamiliesOnDemandValue() == null) { 2565 scan.setLoadColumnFamiliesOnDemand(region.isLoadingCfsOnDemandDefault()); 2566 } 2567 RegionScannerImpl scanner = null; 2568 try { 2569 scanner = region.getScanner(scan); 2570 scanner.next(results); 2571 } finally { 2572 if (scanner != null) { 2573 if (closeCallBack == null) { 2574 // If there is a context then the scanner can be added to the current 2575 // RpcCallContext. The rpc callback will take care of closing the 2576 // scanner, for eg in case 2577 // of get() 2578 context.setCallBack(scanner); 2579 } else { 2580 // The call is from multi() where the results from the get() are 2581 // aggregated and then send out to the 2582 // rpc. The rpccall back will close all such scanners created as part 2583 // of multi(). 2584 closeCallBack.addScanner(scanner); 2585 } 2586 } 2587 } 2588 2589 // post-get CP hook 2590 if (region.getCoprocessorHost() != null) { 2591 region.getCoprocessorHost().postGet(get, results); 2592 } 2593 region.metricsUpdateForGet(); 2594 2595 return Result.create(results, get.isCheckExistenceOnly() ? !results.isEmpty() : null, stale); 2596 } 2597 2598 private void checkBatchSizeAndLogLargeSize(MultiRequest request) throws ServiceException { 2599 int sum = 0; 2600 String firstRegionName = null; 2601 for (RegionAction regionAction : request.getRegionActionList()) { 2602 if (sum == 0) { 2603 firstRegionName = Bytes.toStringBinary(regionAction.getRegion().getValue().toByteArray()); 2604 } 2605 sum += regionAction.getActionCount(); 2606 } 2607 if (sum > rowSizeWarnThreshold) { 2608 LOG.warn("Large batch operation detected (greater than " + rowSizeWarnThreshold 2609 + ") (HBASE-18023)." + " Requested Number of Rows: " + sum + " Client: " 2610 + RpcServer.getRequestUserName().orElse(null) + "/" 2611 + RpcServer.getRemoteAddress().orElse(null) + " first region in multi=" + firstRegionName); 2612 if (rejectRowsWithSizeOverThreshold) { 2613 throw new ServiceException( 2614 "Rejecting large batch operation for current batch with firstRegionName: " 2615 + firstRegionName + " , Requested Number of Rows: " + sum + " , Size Threshold: " 2616 + rowSizeWarnThreshold); 2617 } 2618 } 2619 } 2620 2621 private void failRegionAction(MultiResponse.Builder responseBuilder, 2622 RegionActionResult.Builder regionActionResultBuilder, RegionAction regionAction, 2623 CellScanner cellScanner, Throwable error) { 2624 rpcServer.getMetrics().exception(error); 2625 regionActionResultBuilder.setException(ResponseConverter.buildException(error)); 2626 responseBuilder.addRegionActionResult(regionActionResultBuilder.build()); 2627 // All Mutations in this RegionAction not executed as we can not see the Region online here 2628 // in this RS. Will be retried from Client. Skipping all the Cells in CellScanner 2629 // corresponding to these Mutations. 2630 if (cellScanner != null) { 2631 skipCellsForMutations(regionAction.getActionList(), cellScanner); 2632 } 2633 } 2634 2635 private boolean isReplicationRequest(Action action) { 2636 // replication request can only be put or delete. 2637 if (!action.hasMutation()) { 2638 return false; 2639 } 2640 MutationProto mutation = action.getMutation(); 2641 MutationType type = mutation.getMutateType(); 2642 if (type != MutationType.PUT && type != MutationType.DELETE) { 2643 return false; 2644 } 2645 // replication will set a special attribute so we can make use of it to decide whether a request 2646 // is for replication. 2647 return mutation.getAttributeList().stream().map(p -> p.getName()) 2648 .filter(n -> n.equals(ReplicationUtils.REPLICATION_ATTR_NAME)).findAny().isPresent(); 2649 } 2650 2651 /** 2652 * Execute multiple actions on a table: get, mutate, and/or execCoprocessor 2653 * @param rpcc the RPC controller 2654 * @param request the multi request 2655 */ 2656 @Override 2657 public MultiResponse multi(final RpcController rpcc, final MultiRequest request) 2658 throws ServiceException { 2659 try { 2660 checkOpen(); 2661 } catch (IOException ie) { 2662 throw new ServiceException(ie); 2663 } 2664 2665 checkBatchSizeAndLogLargeSize(request); 2666 2667 // rpc controller is how we bring in data via the back door; it is unprotobuf'ed data. 2668 // It is also the conduit via which we pass back data. 2669 HBaseRpcController controller = (HBaseRpcController) rpcc; 2670 CellScanner cellScanner = controller != null ? controller.cellScanner() : null; 2671 if (controller != null) { 2672 controller.setCellScanner(null); 2673 } 2674 2675 long nonceGroup = request.hasNonceGroup() ? request.getNonceGroup() : HConstants.NO_NONCE; 2676 2677 MultiResponse.Builder responseBuilder = MultiResponse.newBuilder(); 2678 RegionActionResult.Builder regionActionResultBuilder = RegionActionResult.newBuilder(); 2679 this.rpcMultiRequestCount.increment(); 2680 this.requestCount.increment(); 2681 ActivePolicyEnforcement spaceQuotaEnforcement = getSpaceQuotaManager().getActiveEnforcements(); 2682 2683 // We no longer use MultiRequest#condition. Instead, we use RegionAction#condition. The 2684 // following logic is for backward compatibility as old clients still use 2685 // MultiRequest#condition in case of checkAndMutate with RowMutations. 2686 if (request.hasCondition()) { 2687 if (request.getRegionActionList().isEmpty()) { 2688 // If the region action list is empty, do nothing. 2689 responseBuilder.setProcessed(true); 2690 return responseBuilder.build(); 2691 } 2692 2693 RegionAction regionAction = request.getRegionAction(0); 2694 2695 // When request.hasCondition() is true, regionAction.getAtomic() should be always true. So 2696 // we can assume regionAction.getAtomic() is true here. 2697 assert regionAction.getAtomic(); 2698 2699 OperationQuota quota; 2700 HRegion region; 2701 RegionSpecifier regionSpecifier = regionAction.getRegion(); 2702 2703 try { 2704 region = getRegion(regionSpecifier); 2705 quota = getRpcQuotaManager().checkBatchQuota(region, regionAction.getActionList(), 2706 regionAction.hasCondition()); 2707 } catch (IOException e) { 2708 failRegionAction(responseBuilder, regionActionResultBuilder, regionAction, cellScanner, e); 2709 return responseBuilder.build(); 2710 } 2711 2712 try { 2713 boolean rejectIfFromClient = shouldRejectRequestsFromClient(region); 2714 // We only allow replication in standby state and it will not set the atomic flag. 2715 if (rejectIfFromClient) { 2716 failRegionAction(responseBuilder, regionActionResultBuilder, regionAction, cellScanner, 2717 new DoNotRetryIOException( 2718 region.getRegionInfo().getRegionNameAsString() + " is in STANDBY state")); 2719 return responseBuilder.build(); 2720 } 2721 2722 try { 2723 CheckAndMutateResult result = checkAndMutate(region, regionAction.getActionList(), 2724 cellScanner, request.getCondition(), nonceGroup, spaceQuotaEnforcement); 2725 responseBuilder.setProcessed(result.isSuccess()); 2726 ClientProtos.ResultOrException.Builder resultOrExceptionOrBuilder = 2727 ClientProtos.ResultOrException.newBuilder(); 2728 for (int i = 0; i < regionAction.getActionCount(); i++) { 2729 // To unify the response format with doNonAtomicRegionMutation and read through 2730 // client's AsyncProcess we have to add an empty result instance per operation 2731 resultOrExceptionOrBuilder.clear(); 2732 resultOrExceptionOrBuilder.setIndex(i); 2733 regionActionResultBuilder.addResultOrException(resultOrExceptionOrBuilder.build()); 2734 } 2735 } catch (IOException e) { 2736 rpcServer.getMetrics().exception(e); 2737 // As it's an atomic operation with a condition, we may expect it's a global failure. 2738 regionActionResultBuilder.setException(ResponseConverter.buildException(e)); 2739 } 2740 } finally { 2741 quota.close(); 2742 } 2743 2744 responseBuilder.addRegionActionResult(regionActionResultBuilder.build()); 2745 ClientProtos.RegionLoadStats regionLoadStats = region.getLoadStatistics(); 2746 if (regionLoadStats != null) { 2747 responseBuilder.setRegionStatistics(MultiRegionLoadStats.newBuilder() 2748 .addRegion(regionSpecifier).addStat(regionLoadStats).build()); 2749 } 2750 return responseBuilder.build(); 2751 } 2752 2753 // this will contain all the cells that we need to return. It's created later, if needed. 2754 List<CellScannable> cellsToReturn = null; 2755 RegionScannersCloseCallBack closeCallBack = null; 2756 RpcCallContext context = RpcServer.getCurrentCall().orElse(null); 2757 Map<RegionSpecifier, ClientProtos.RegionLoadStats> regionStats = 2758 new HashMap<>(request.getRegionActionCount()); 2759 2760 for (RegionAction regionAction : request.getRegionActionList()) { 2761 OperationQuota quota; 2762 HRegion region; 2763 RegionSpecifier regionSpecifier = regionAction.getRegion(); 2764 regionActionResultBuilder.clear(); 2765 2766 try { 2767 region = getRegion(regionSpecifier); 2768 quota = getRpcQuotaManager().checkBatchQuota(region, regionAction.getActionList(), 2769 regionAction.hasCondition()); 2770 } catch (IOException e) { 2771 failRegionAction(responseBuilder, regionActionResultBuilder, regionAction, cellScanner, e); 2772 continue; // For this region it's a failure. 2773 } 2774 2775 try { 2776 boolean rejectIfFromClient = shouldRejectRequestsFromClient(region); 2777 2778 if (regionAction.hasCondition()) { 2779 // We only allow replication in standby state and it will not set the atomic flag. 2780 if (rejectIfFromClient) { 2781 failRegionAction(responseBuilder, regionActionResultBuilder, regionAction, cellScanner, 2782 new DoNotRetryIOException( 2783 region.getRegionInfo().getRegionNameAsString() + " is in STANDBY state")); 2784 continue; 2785 } 2786 2787 try { 2788 ClientProtos.ResultOrException.Builder resultOrExceptionOrBuilder = 2789 ClientProtos.ResultOrException.newBuilder(); 2790 if (regionAction.getActionCount() == 1) { 2791 CheckAndMutateResult result = 2792 checkAndMutate(region, quota, regionAction.getAction(0).getMutation(), cellScanner, 2793 regionAction.getCondition(), nonceGroup, spaceQuotaEnforcement, context); 2794 regionActionResultBuilder.setProcessed(result.isSuccess()); 2795 resultOrExceptionOrBuilder.setIndex(0); 2796 if (result.getResult() != null) { 2797 resultOrExceptionOrBuilder.setResult(ProtobufUtil.toResult(result.getResult())); 2798 } 2799 regionActionResultBuilder.addResultOrException(resultOrExceptionOrBuilder.build()); 2800 } else { 2801 CheckAndMutateResult result = checkAndMutate(region, regionAction.getActionList(), 2802 cellScanner, regionAction.getCondition(), nonceGroup, spaceQuotaEnforcement); 2803 regionActionResultBuilder.setProcessed(result.isSuccess()); 2804 for (int i = 0; i < regionAction.getActionCount(); i++) { 2805 if (i == 0 && result.getResult() != null) { 2806 // Set the result of the Increment/Append operations to the first element of the 2807 // ResultOrException list 2808 resultOrExceptionOrBuilder.setIndex(i); 2809 regionActionResultBuilder.addResultOrException(resultOrExceptionOrBuilder 2810 .setResult(ProtobufUtil.toResult(result.getResult())).build()); 2811 continue; 2812 } 2813 // To unify the response format with doNonAtomicRegionMutation and read through 2814 // client's AsyncProcess we have to add an empty result instance per operation 2815 resultOrExceptionOrBuilder.clear(); 2816 resultOrExceptionOrBuilder.setIndex(i); 2817 regionActionResultBuilder.addResultOrException(resultOrExceptionOrBuilder.build()); 2818 } 2819 } 2820 } catch (IOException e) { 2821 rpcServer.getMetrics().exception(e); 2822 // As it's an atomic operation with a condition, we may expect it's a global failure. 2823 regionActionResultBuilder.setException(ResponseConverter.buildException(e)); 2824 } 2825 } else if (regionAction.hasAtomic() && regionAction.getAtomic()) { 2826 // We only allow replication in standby state and it will not set the atomic flag. 2827 if (rejectIfFromClient) { 2828 failRegionAction(responseBuilder, regionActionResultBuilder, regionAction, cellScanner, 2829 new DoNotRetryIOException( 2830 region.getRegionInfo().getRegionNameAsString() + " is in STANDBY state")); 2831 continue; 2832 } 2833 try { 2834 doAtomicBatchOp(regionActionResultBuilder, region, quota, regionAction.getActionList(), 2835 cellScanner, nonceGroup, spaceQuotaEnforcement); 2836 regionActionResultBuilder.setProcessed(true); 2837 // We no longer use MultiResponse#processed. Instead, we use 2838 // RegionActionResult#processed. This is for backward compatibility for old clients. 2839 responseBuilder.setProcessed(true); 2840 } catch (IOException e) { 2841 rpcServer.getMetrics().exception(e); 2842 // As it's atomic, we may expect it's a global failure. 2843 regionActionResultBuilder.setException(ResponseConverter.buildException(e)); 2844 } 2845 } else { 2846 if ( 2847 rejectIfFromClient && regionAction.getActionCount() > 0 2848 && !isReplicationRequest(regionAction.getAction(0)) 2849 ) { 2850 // fail if it is not a replication request 2851 failRegionAction(responseBuilder, regionActionResultBuilder, regionAction, cellScanner, 2852 new DoNotRetryIOException( 2853 region.getRegionInfo().getRegionNameAsString() + " is in STANDBY state")); 2854 continue; 2855 } 2856 // doNonAtomicRegionMutation manages the exception internally 2857 if (context != null && closeCallBack == null) { 2858 // An RpcCallBack that creates a list of scanners that needs to perform callBack 2859 // operation on completion of multiGets. 2860 // Set this only once 2861 closeCallBack = new RegionScannersCloseCallBack(); 2862 context.setCallBack(closeCallBack); 2863 } 2864 cellsToReturn = doNonAtomicRegionMutation(region, quota, regionAction, cellScanner, 2865 regionActionResultBuilder, cellsToReturn, nonceGroup, closeCallBack, context, 2866 spaceQuotaEnforcement); 2867 } 2868 } finally { 2869 quota.close(); 2870 } 2871 2872 responseBuilder.addRegionActionResult(regionActionResultBuilder.build()); 2873 ClientProtos.RegionLoadStats regionLoadStats = region.getLoadStatistics(); 2874 if (regionLoadStats != null) { 2875 regionStats.put(regionSpecifier, regionLoadStats); 2876 } 2877 } 2878 // Load the controller with the Cells to return. 2879 if (cellsToReturn != null && !cellsToReturn.isEmpty() && controller != null) { 2880 controller.setCellScanner(CellUtil.createCellScanner(cellsToReturn)); 2881 } 2882 2883 MultiRegionLoadStats.Builder builder = MultiRegionLoadStats.newBuilder(); 2884 for (Entry<RegionSpecifier, ClientProtos.RegionLoadStats> stat : regionStats.entrySet()) { 2885 builder.addRegion(stat.getKey()); 2886 builder.addStat(stat.getValue()); 2887 } 2888 responseBuilder.setRegionStatistics(builder); 2889 return responseBuilder.build(); 2890 } 2891 2892 private void skipCellsForMutations(List<Action> actions, CellScanner cellScanner) { 2893 if (cellScanner == null) { 2894 return; 2895 } 2896 for (Action action : actions) { 2897 skipCellsForMutation(action, cellScanner); 2898 } 2899 } 2900 2901 private void skipCellsForMutation(Action action, CellScanner cellScanner) { 2902 if (cellScanner == null) { 2903 return; 2904 } 2905 try { 2906 if (action.hasMutation()) { 2907 MutationProto m = action.getMutation(); 2908 if (m.hasAssociatedCellCount()) { 2909 for (int i = 0; i < m.getAssociatedCellCount(); i++) { 2910 cellScanner.advance(); 2911 } 2912 } 2913 } 2914 } catch (IOException e) { 2915 // No need to handle these Individual Muatation level issue. Any way this entire RegionAction 2916 // marked as failed as we could not see the Region here. At client side the top level 2917 // RegionAction exception will be considered first. 2918 LOG.error("Error while skipping Cells in CellScanner for invalid Region Mutations", e); 2919 } 2920 } 2921 2922 /** 2923 * Mutate data in a table. 2924 * @param rpcc the RPC controller 2925 * @param request the mutate request 2926 */ 2927 @Override 2928 public MutateResponse mutate(final RpcController rpcc, final MutateRequest request) 2929 throws ServiceException { 2930 // rpc controller is how we bring in data via the back door; it is unprotobuf'ed data. 2931 // It is also the conduit via which we pass back data. 2932 HBaseRpcController controller = (HBaseRpcController) rpcc; 2933 CellScanner cellScanner = controller != null ? controller.cellScanner() : null; 2934 OperationQuota quota = null; 2935 RpcCallContext context = RpcServer.getCurrentCall().orElse(null); 2936 // Clear scanner so we are not holding on to reference across call. 2937 if (controller != null) { 2938 controller.setCellScanner(null); 2939 } 2940 try { 2941 checkOpen(); 2942 requestCount.increment(); 2943 rpcMutateRequestCount.increment(); 2944 HRegion region = getRegion(request.getRegion()); 2945 rejectIfInStandByState(region); 2946 MutateResponse.Builder builder = MutateResponse.newBuilder(); 2947 MutationProto mutation = request.getMutation(); 2948 if (!region.getRegionInfo().isMetaRegion()) { 2949 server.getMemStoreFlusher().reclaimMemStoreMemory(); 2950 } 2951 long nonceGroup = request.hasNonceGroup() ? request.getNonceGroup() : HConstants.NO_NONCE; 2952 OperationQuota.OperationType operationType = QuotaUtil.getQuotaOperationType(request); 2953 quota = getRpcQuotaManager().checkBatchQuota(region, operationType); 2954 ActivePolicyEnforcement spaceQuotaEnforcement = 2955 getSpaceQuotaManager().getActiveEnforcements(); 2956 2957 if (request.hasCondition()) { 2958 CheckAndMutateResult result = checkAndMutate(region, quota, mutation, cellScanner, 2959 request.getCondition(), nonceGroup, spaceQuotaEnforcement, context); 2960 builder.setProcessed(result.isSuccess()); 2961 boolean clientCellBlockSupported = isClientCellBlockSupport(context); 2962 addResult(builder, result.getResult(), controller, clientCellBlockSupported); 2963 if (clientCellBlockSupported) { 2964 addSize(context, result.getResult()); 2965 } 2966 } else { 2967 Result r = null; 2968 Boolean processed = null; 2969 MutationType type = mutation.getMutateType(); 2970 switch (type) { 2971 case APPEND: 2972 // TODO: this doesn't actually check anything. 2973 r = append(region, quota, mutation, cellScanner, nonceGroup, spaceQuotaEnforcement, 2974 context); 2975 break; 2976 case INCREMENT: 2977 // TODO: this doesn't actually check anything. 2978 r = increment(region, quota, mutation, cellScanner, nonceGroup, spaceQuotaEnforcement, 2979 context); 2980 break; 2981 case PUT: 2982 put(region, quota, mutation, cellScanner, spaceQuotaEnforcement); 2983 processed = Boolean.TRUE; 2984 break; 2985 case DELETE: 2986 delete(region, quota, mutation, cellScanner, spaceQuotaEnforcement); 2987 processed = Boolean.TRUE; 2988 break; 2989 default: 2990 throw new DoNotRetryIOException("Unsupported mutate type: " + type.name()); 2991 } 2992 if (processed != null) { 2993 builder.setProcessed(processed); 2994 } 2995 boolean clientCellBlockSupported = isClientCellBlockSupport(context); 2996 addResult(builder, r, controller, clientCellBlockSupported); 2997 if (clientCellBlockSupported) { 2998 addSize(context, r); 2999 } 3000 } 3001 return builder.build(); 3002 } catch (IOException ie) { 3003 server.checkFileSystem(); 3004 throw new ServiceException(ie); 3005 } finally { 3006 if (quota != null) { 3007 quota.close(); 3008 } 3009 } 3010 } 3011 3012 private void put(HRegion region, OperationQuota quota, MutationProto mutation, 3013 CellScanner cellScanner, ActivePolicyEnforcement spaceQuota) throws IOException { 3014 long before = EnvironmentEdgeManager.currentTime(); 3015 Put put = ProtobufUtil.toPut(mutation, cellScanner); 3016 checkCellSizeLimit(region, put); 3017 spaceQuota.getPolicyEnforcement(region).check(put); 3018 quota.addMutation(put); 3019 region.put(put); 3020 3021 MetricsRegionServer metricsRegionServer = server.getMetrics(); 3022 if (metricsRegionServer != null) { 3023 long after = EnvironmentEdgeManager.currentTime(); 3024 metricsRegionServer.updatePut(region, after - before); 3025 } 3026 } 3027 3028 private void delete(HRegion region, OperationQuota quota, MutationProto mutation, 3029 CellScanner cellScanner, ActivePolicyEnforcement spaceQuota) throws IOException { 3030 long before = EnvironmentEdgeManager.currentTime(); 3031 Delete delete = ProtobufUtil.toDelete(mutation, cellScanner); 3032 checkCellSizeLimit(region, delete); 3033 spaceQuota.getPolicyEnforcement(region).check(delete); 3034 quota.addMutation(delete); 3035 region.delete(delete); 3036 3037 MetricsRegionServer metricsRegionServer = server.getMetrics(); 3038 if (metricsRegionServer != null) { 3039 long after = EnvironmentEdgeManager.currentTime(); 3040 metricsRegionServer.updateDelete(region, after - before); 3041 } 3042 } 3043 3044 private CheckAndMutateResult checkAndMutate(HRegion region, OperationQuota quota, 3045 MutationProto mutation, CellScanner cellScanner, Condition condition, long nonceGroup, 3046 ActivePolicyEnforcement spaceQuota, RpcCallContext context) throws IOException { 3047 long before = EnvironmentEdgeManager.currentTime(); 3048 long blockBytesScannedBefore = context != null ? context.getBlockBytesScanned() : 0; 3049 CheckAndMutate checkAndMutate = ProtobufUtil.toCheckAndMutate(condition, mutation, cellScanner); 3050 long nonce = mutation.hasNonce() ? mutation.getNonce() : HConstants.NO_NONCE; 3051 checkCellSizeLimit(region, (Mutation) checkAndMutate.getAction()); 3052 spaceQuota.getPolicyEnforcement(region).check((Mutation) checkAndMutate.getAction()); 3053 quota.addMutation((Mutation) checkAndMutate.getAction()); 3054 3055 CheckAndMutateResult result = null; 3056 if (region.getCoprocessorHost() != null) { 3057 result = region.getCoprocessorHost().preCheckAndMutate(checkAndMutate); 3058 } 3059 if (result == null) { 3060 result = region.checkAndMutate(checkAndMutate, nonceGroup, nonce); 3061 if (region.getCoprocessorHost() != null) { 3062 result = region.getCoprocessorHost().postCheckAndMutate(checkAndMutate, result); 3063 } 3064 } 3065 MetricsRegionServer metricsRegionServer = server.getMetrics(); 3066 if (metricsRegionServer != null) { 3067 long after = EnvironmentEdgeManager.currentTime(); 3068 long blockBytesScanned = 3069 context != null ? context.getBlockBytesScanned() - blockBytesScannedBefore : 0; 3070 metricsRegionServer.updateCheckAndMutate(region, after - before, blockBytesScanned); 3071 3072 MutationType type = mutation.getMutateType(); 3073 switch (type) { 3074 case PUT: 3075 metricsRegionServer.updateCheckAndPut(region, after - before); 3076 break; 3077 case DELETE: 3078 metricsRegionServer.updateCheckAndDelete(region, after - before); 3079 break; 3080 default: 3081 break; 3082 } 3083 } 3084 return result; 3085 } 3086 3087 // This is used to keep compatible with the old client implementation. Consider remove it if we 3088 // decide to drop the support of the client that still sends close request to a region scanner 3089 // which has already been exhausted. 3090 @Deprecated 3091 private static final IOException SCANNER_ALREADY_CLOSED = new IOException() { 3092 3093 private static final long serialVersionUID = -4305297078988180130L; 3094 3095 @Override 3096 public synchronized Throwable fillInStackTrace() { 3097 return this; 3098 } 3099 }; 3100 3101 private RegionScannerHolder getRegionScanner(ScanRequest request) throws IOException { 3102 String scannerName = toScannerName(request.getScannerId()); 3103 RegionScannerHolder rsh = this.scanners.get(scannerName); 3104 if (rsh == null) { 3105 // just ignore the next or close request if scanner does not exists. 3106 if (closedScanners.getIfPresent(scannerName) != null) { 3107 throw SCANNER_ALREADY_CLOSED; 3108 } else { 3109 LOG.warn("Client tried to access missing scanner " + scannerName); 3110 throw new UnknownScannerException( 3111 "Unknown scanner '" + scannerName + "'. This can happen due to any of the following " 3112 + "reasons: a) Scanner id given is wrong, b) Scanner lease expired because of " 3113 + "long wait between consecutive client checkins, c) Server may be closing down, " 3114 + "d) RegionServer restart during upgrade.\nIf the issue is due to reason (b), a " 3115 + "possible fix would be increasing the value of" 3116 + "'hbase.client.scanner.timeout.period' configuration."); 3117 } 3118 } 3119 rejectIfInStandByState(rsh.r); 3120 RegionInfo hri = rsh.s.getRegionInfo(); 3121 // Yes, should be the same instance 3122 if (server.getOnlineRegion(hri.getRegionName()) != rsh.r) { 3123 String msg = "Region has changed on the scanner " + scannerName + ": regionName=" 3124 + hri.getRegionNameAsString() + ", scannerRegionName=" + rsh.r; 3125 LOG.warn(msg + ", closing..."); 3126 scanners.remove(scannerName); 3127 try { 3128 rsh.s.close(); 3129 } catch (IOException e) { 3130 LOG.warn("Getting exception closing " + scannerName, e); 3131 } finally { 3132 try { 3133 server.getLeaseManager().cancelLease(scannerName); 3134 } catch (LeaseException e) { 3135 LOG.warn("Getting exception closing " + scannerName, e); 3136 } 3137 } 3138 throw new NotServingRegionException(msg); 3139 } 3140 return rsh; 3141 } 3142 3143 /** 3144 * @return Pair with scannerName key to use with this new Scanner and its RegionScannerHolder 3145 * value. 3146 */ 3147 private Pair<String, RegionScannerHolder> newRegionScanner(ScanRequest request, 3148 ScanResponse.Builder builder) throws IOException { 3149 HRegion region = getRegion(request.getRegion()); 3150 rejectIfInStandByState(region); 3151 ClientProtos.Scan protoScan = request.getScan(); 3152 boolean isLoadingCfsOnDemandSet = protoScan.hasLoadColumnFamiliesOnDemand(); 3153 Scan scan = ProtobufUtil.toScan(protoScan); 3154 // if the request doesn't set this, get the default region setting. 3155 if (!isLoadingCfsOnDemandSet) { 3156 scan.setLoadColumnFamiliesOnDemand(region.isLoadingCfsOnDemandDefault()); 3157 } 3158 3159 if (!scan.hasFamilies()) { 3160 // Adding all families to scanner 3161 for (byte[] family : region.getTableDescriptor().getColumnFamilyNames()) { 3162 scan.addFamily(family); 3163 } 3164 } 3165 if (region.getCoprocessorHost() != null) { 3166 // preScannerOpen is not allowed to return a RegionScanner. Only post hook can create a 3167 // wrapper for the core created RegionScanner 3168 region.getCoprocessorHost().preScannerOpen(scan); 3169 } 3170 RegionScannerImpl coreScanner = region.getScanner(scan); 3171 Shipper shipper = coreScanner; 3172 RegionScanner scanner = coreScanner; 3173 try { 3174 if (region.getCoprocessorHost() != null) { 3175 scanner = region.getCoprocessorHost().postScannerOpen(scan, scanner); 3176 } 3177 } catch (Exception e) { 3178 // Although region coprocessor is for advanced users and they should take care of the 3179 // implementation to not damage the HBase system, closing the scanner on exception here does 3180 // not have any bad side effect, so let's do it 3181 scanner.close(); 3182 throw e; 3183 } 3184 long scannerId = scannerIdGenerator.generateNewScannerId(); 3185 builder.setScannerId(scannerId); 3186 builder.setMvccReadPoint(scanner.getMvccReadPoint()); 3187 builder.setTtl(scannerLeaseTimeoutPeriod); 3188 String scannerName = toScannerName(scannerId); 3189 3190 boolean fullRegionScan = 3191 !region.getRegionInfo().getTable().isSystemTable() && isFullRegionScan(scan, region); 3192 3193 return new Pair<String, RegionScannerHolder>(scannerName, 3194 addScanner(scannerName, scanner, shipper, region, scan.isNeedCursorResult(), fullRegionScan)); 3195 } 3196 3197 /** 3198 * The returned String is used as key doing look up of outstanding Scanners in this Servers' 3199 * this.scanners, the Map of outstanding scanners and their current state. 3200 * @param scannerId A scanner long id. 3201 * @return The long id as a String. 3202 */ 3203 private static String toScannerName(long scannerId) { 3204 return Long.toString(scannerId); 3205 } 3206 3207 private void checkScanNextCallSeq(ScanRequest request, RegionScannerHolder rsh) 3208 throws OutOfOrderScannerNextException { 3209 // if nextCallSeq does not match throw Exception straight away. This needs to be 3210 // performed even before checking of Lease. 3211 // See HBASE-5974 3212 if (request.hasNextCallSeq()) { 3213 long callSeq = request.getNextCallSeq(); 3214 if (!rsh.incNextCallSeq(callSeq)) { 3215 throw new OutOfOrderScannerNextException( 3216 "Expected nextCallSeq: " + rsh.getNextCallSeq() + " But the nextCallSeq got from client: " 3217 + request.getNextCallSeq() + "; request=" + TextFormat.shortDebugString(request)); 3218 } 3219 } 3220 } 3221 3222 private void addScannerLeaseBack(LeaseManager.Lease lease) { 3223 try { 3224 server.getLeaseManager().addLease(lease); 3225 } catch (LeaseStillHeldException e) { 3226 // should not happen as the scanner id is unique. 3227 throw new AssertionError(e); 3228 } 3229 } 3230 3231 // visible for testing only 3232 long getTimeLimit(RpcCall rpcCall, HBaseRpcController controller, 3233 boolean allowHeartbeatMessages) { 3234 // Set the time limit to be half of the more restrictive timeout value (one of the 3235 // timeout values must be positive). In the event that both values are positive, the 3236 // more restrictive of the two is used to calculate the limit. 3237 if (allowHeartbeatMessages) { 3238 long now = EnvironmentEdgeManager.currentTime(); 3239 long remainingTimeout = getRemainingRpcTimeout(rpcCall, controller, now); 3240 if (scannerLeaseTimeoutPeriod > 0 || remainingTimeout > 0) { 3241 long timeLimitDelta; 3242 if (scannerLeaseTimeoutPeriod > 0 && remainingTimeout > 0) { 3243 timeLimitDelta = Math.min(scannerLeaseTimeoutPeriod, remainingTimeout); 3244 } else { 3245 timeLimitDelta = 3246 scannerLeaseTimeoutPeriod > 0 ? scannerLeaseTimeoutPeriod : remainingTimeout; 3247 } 3248 3249 // Use half of whichever timeout value was more restrictive... But don't allow 3250 // the time limit to be less than the allowable minimum (could cause an 3251 // immediate timeout before scanning any data). 3252 timeLimitDelta = Math.max(timeLimitDelta / 2, minimumScanTimeLimitDelta); 3253 return now + timeLimitDelta; 3254 } 3255 } 3256 // Default value of timeLimit is negative to indicate no timeLimit should be 3257 // enforced. 3258 return -1L; 3259 } 3260 3261 private long getRemainingRpcTimeout(RpcCall call, HBaseRpcController controller, long now) { 3262 long timeout; 3263 if (controller != null && controller.getCallTimeout() > 0) { 3264 timeout = controller.getCallTimeout(); 3265 } else if (rpcTimeout > 0) { 3266 timeout = rpcTimeout; 3267 } else { 3268 return -1; 3269 } 3270 if (call != null) { 3271 timeout -= (now - call.getReceiveTime()); 3272 } 3273 // getTimeLimit ignores values <= 0, but timeout may now be negative if queue time was high. 3274 // return minimum value here in that case so we count this in calculating the final delta. 3275 return Math.max(minimumScanTimeLimitDelta, timeout); 3276 } 3277 3278 private void checkLimitOfRows(int numOfCompleteRows, int limitOfRows, boolean moreRows, 3279 ScannerContext scannerContext, ScanResponse.Builder builder) { 3280 if (numOfCompleteRows >= limitOfRows) { 3281 if (LOG.isTraceEnabled()) { 3282 LOG.trace("Done scanning, limit of rows reached, moreRows: " + moreRows 3283 + " scannerContext: " + scannerContext); 3284 } 3285 builder.setMoreResults(false); 3286 } 3287 } 3288 3289 // return whether we have more results in region. 3290 private void scan(HBaseRpcController controller, ScanRequest request, RegionScannerHolder rsh, 3291 long maxQuotaResultSize, int maxResults, int limitOfRows, List<Result> results, 3292 ScanResponse.Builder builder, RpcCall rpcCall) throws IOException { 3293 HRegion region = rsh.r; 3294 RegionScanner scanner = rsh.s; 3295 long maxResultSize; 3296 if (scanner.getMaxResultSize() > 0) { 3297 maxResultSize = Math.min(scanner.getMaxResultSize(), maxQuotaResultSize); 3298 } else { 3299 maxResultSize = maxQuotaResultSize; 3300 } 3301 // This is cells inside a row. Default size is 10 so if many versions or many cfs, 3302 // then we'll resize. Resizings show in profiler. Set it higher than 10. For now 3303 // arbitrary 32. TODO: keep record of general size of results being returned. 3304 ArrayList<Cell> values = new ArrayList<>(32); 3305 region.startRegionOperation(Operation.SCAN); 3306 long before = EnvironmentEdgeManager.currentTime(); 3307 // Used to check if we've matched the row limit set on the Scan 3308 int numOfCompleteRows = 0; 3309 // Count of times we call nextRaw; can be > numOfCompleteRows. 3310 int numOfNextRawCalls = 0; 3311 try { 3312 int numOfResults = 0; 3313 synchronized (scanner) { 3314 boolean stale = (region.getRegionInfo().getReplicaId() != 0); 3315 boolean clientHandlesPartials = 3316 request.hasClientHandlesPartials() && request.getClientHandlesPartials(); 3317 boolean clientHandlesHeartbeats = 3318 request.hasClientHandlesHeartbeats() && request.getClientHandlesHeartbeats(); 3319 3320 // On the server side we must ensure that the correct ordering of partial results is 3321 // returned to the client to allow them to properly reconstruct the partial results. 3322 // If the coprocessor host is adding to the result list, we cannot guarantee the 3323 // correct ordering of partial results and so we prevent partial results from being 3324 // formed. 3325 boolean serverGuaranteesOrderOfPartials = results.isEmpty(); 3326 boolean allowPartialResults = clientHandlesPartials && serverGuaranteesOrderOfPartials; 3327 boolean moreRows = false; 3328 3329 // Heartbeat messages occur when the processing of the ScanRequest is exceeds a 3330 // certain time threshold on the server. When the time threshold is exceeded, the 3331 // server stops the scan and sends back whatever Results it has accumulated within 3332 // that time period (may be empty). Since heartbeat messages have the potential to 3333 // create partial Results (in the event that the timeout occurs in the middle of a 3334 // row), we must only generate heartbeat messages when the client can handle both 3335 // heartbeats AND partials 3336 boolean allowHeartbeatMessages = clientHandlesHeartbeats && allowPartialResults; 3337 3338 long timeLimit = getTimeLimit(rpcCall, controller, allowHeartbeatMessages); 3339 3340 final LimitScope sizeScope = 3341 allowPartialResults ? LimitScope.BETWEEN_CELLS : LimitScope.BETWEEN_ROWS; 3342 final LimitScope timeScope = 3343 allowHeartbeatMessages ? LimitScope.BETWEEN_CELLS : LimitScope.BETWEEN_ROWS; 3344 3345 boolean trackMetrics = request.hasTrackScanMetrics() && request.getTrackScanMetrics(); 3346 3347 // Configure with limits for this RPC. Set keep progress true since size progress 3348 // towards size limit should be kept between calls to nextRaw 3349 ScannerContext.Builder contextBuilder = ScannerContext.newBuilder(true); 3350 // maxResultSize - either we can reach this much size for all cells(being read) data or sum 3351 // of heap size occupied by cells(being read). Cell data means its key and value parts. 3352 // maxQuotaResultSize - max results just from server side configuration and quotas, without 3353 // user's specified max. We use this for evaluating limits based on blocks (not cells). 3354 // We may have accumulated some results in coprocessor preScannerNext call. Subtract any 3355 // cell or block size from maximum here so we adhere to total limits of request. 3356 // Note: we track block size in StoreScanner. If the CP hook got cells from hbase, it will 3357 // have accumulated block bytes. If not, this will be 0 for block size. 3358 long maxCellSize = maxResultSize; 3359 long maxBlockSize = maxQuotaResultSize; 3360 if (rpcCall != null) { 3361 maxBlockSize -= rpcCall.getBlockBytesScanned(); 3362 maxCellSize -= rpcCall.getResponseCellSize(); 3363 } 3364 3365 contextBuilder.setSizeLimit(sizeScope, maxCellSize, maxCellSize, maxBlockSize); 3366 contextBuilder.setBatchLimit(scanner.getBatch()); 3367 contextBuilder.setTimeLimit(timeScope, timeLimit); 3368 contextBuilder.setTrackMetrics(trackMetrics); 3369 ScannerContext scannerContext = contextBuilder.build(); 3370 boolean limitReached = false; 3371 while (numOfResults < maxResults) { 3372 // Reset the batch progress to 0 before every call to RegionScanner#nextRaw. The 3373 // batch limit is a limit on the number of cells per Result. Thus, if progress is 3374 // being tracked (i.e. scannerContext.keepProgress() is true) then we need to 3375 // reset the batch progress between nextRaw invocations since we don't want the 3376 // batch progress from previous calls to affect future calls 3377 scannerContext.setBatchProgress(0); 3378 assert values.isEmpty(); 3379 3380 // Collect values to be returned here 3381 moreRows = scanner.nextRaw(values, scannerContext); 3382 if (rpcCall == null) { 3383 // When there is no RpcCallContext,copy EC to heap, then the scanner would close, 3384 // This can be an EXPENSIVE call. It may make an extra copy from offheap to onheap 3385 // buffers.See more details in HBASE-26036. 3386 CellUtil.cloneIfNecessary(values); 3387 } 3388 numOfNextRawCalls++; 3389 3390 if (!values.isEmpty()) { 3391 if (limitOfRows > 0) { 3392 // First we need to check if the last result is partial and we have a row change. If 3393 // so then we need to increase the numOfCompleteRows. 3394 if (results.isEmpty()) { 3395 if ( 3396 rsh.rowOfLastPartialResult != null 3397 && !CellUtil.matchingRows(values.get(0), rsh.rowOfLastPartialResult) 3398 ) { 3399 numOfCompleteRows++; 3400 checkLimitOfRows(numOfCompleteRows, limitOfRows, moreRows, scannerContext, 3401 builder); 3402 } 3403 } else { 3404 Result lastResult = results.get(results.size() - 1); 3405 if ( 3406 lastResult.mayHaveMoreCellsInRow() 3407 && !CellUtil.matchingRows(values.get(0), lastResult.getRow()) 3408 ) { 3409 numOfCompleteRows++; 3410 checkLimitOfRows(numOfCompleteRows, limitOfRows, moreRows, scannerContext, 3411 builder); 3412 } 3413 } 3414 if (builder.hasMoreResults() && !builder.getMoreResults()) { 3415 break; 3416 } 3417 } 3418 boolean mayHaveMoreCellsInRow = scannerContext.mayHaveMoreCellsInRow(); 3419 Result r = Result.create(values, null, stale, mayHaveMoreCellsInRow); 3420 results.add(r); 3421 numOfResults++; 3422 if (!mayHaveMoreCellsInRow && limitOfRows > 0) { 3423 numOfCompleteRows++; 3424 checkLimitOfRows(numOfCompleteRows, limitOfRows, moreRows, scannerContext, builder); 3425 if (builder.hasMoreResults() && !builder.getMoreResults()) { 3426 break; 3427 } 3428 } 3429 } else if (!moreRows && !results.isEmpty()) { 3430 // No more cells for the scan here, we need to ensure that the mayHaveMoreCellsInRow of 3431 // last result is false. Otherwise it's possible that: the first nextRaw returned 3432 // because BATCH_LIMIT_REACHED (BTW it happen to exhaust all cells of the scan),so the 3433 // last result's mayHaveMoreCellsInRow will be true. while the following nextRaw will 3434 // return with moreRows=false, which means moreResultsInRegion would be false, it will 3435 // be a contradictory state (HBASE-21206). 3436 int lastIdx = results.size() - 1; 3437 Result r = results.get(lastIdx); 3438 if (r.mayHaveMoreCellsInRow()) { 3439 results.set(lastIdx, Result.create(r.rawCells(), r.getExists(), r.isStale(), false)); 3440 } 3441 } 3442 boolean sizeLimitReached = scannerContext.checkSizeLimit(LimitScope.BETWEEN_ROWS); 3443 boolean timeLimitReached = scannerContext.checkTimeLimit(LimitScope.BETWEEN_ROWS); 3444 boolean resultsLimitReached = numOfResults >= maxResults; 3445 limitReached = sizeLimitReached || timeLimitReached || resultsLimitReached; 3446 3447 if (limitReached || !moreRows) { 3448 // With block size limit, we may exceed size limit without collecting any results. 3449 // In this case we want to send heartbeat and/or cursor. We don't want to send heartbeat 3450 // or cursor if results were collected, for example for cell size or heap size limits. 3451 boolean sizeLimitReachedWithoutResults = sizeLimitReached && results.isEmpty(); 3452 // We only want to mark a ScanResponse as a heartbeat message in the event that 3453 // there are more values to be read server side. If there aren't more values, 3454 // marking it as a heartbeat is wasteful because the client will need to issue 3455 // another ScanRequest only to realize that they already have all the values 3456 if (moreRows && (timeLimitReached || sizeLimitReachedWithoutResults)) { 3457 // Heartbeat messages occur when the time limit has been reached, or size limit has 3458 // been reached before collecting any results. This can happen for heavily filtered 3459 // scans which scan over too many blocks. 3460 builder.setHeartbeatMessage(true); 3461 if (rsh.needCursor) { 3462 Cell cursorCell = scannerContext.getLastPeekedCell(); 3463 if (cursorCell != null) { 3464 builder.setCursor(ProtobufUtil.toCursor(cursorCell)); 3465 } 3466 } 3467 } 3468 break; 3469 } 3470 values.clear(); 3471 } 3472 if (rpcCall != null) { 3473 rpcCall.incrementResponseCellSize(scannerContext.getHeapSizeProgress()); 3474 } 3475 builder.setMoreResultsInRegion(moreRows); 3476 // Check to see if the client requested that we track metrics server side. If the 3477 // client requested metrics, retrieve the metrics from the scanner context. 3478 if (trackMetrics) { 3479 // rather than increment yet another counter in StoreScanner, just set the value here 3480 // from block size progress before writing into the response 3481 scannerContext.getMetrics().countOfBlockBytesScanned 3482 .set(scannerContext.getBlockSizeProgress()); 3483 if (rpcCall != null) { 3484 scannerContext.getMetrics().fsReadTime.set(rpcCall.getFsReadTime()); 3485 } 3486 Map<String, Long> metrics = scannerContext.getMetrics().getMetricsMap(); 3487 ScanMetrics.Builder metricBuilder = ScanMetrics.newBuilder(); 3488 NameInt64Pair.Builder pairBuilder = NameInt64Pair.newBuilder(); 3489 3490 for (Entry<String, Long> entry : metrics.entrySet()) { 3491 pairBuilder.setName(entry.getKey()); 3492 pairBuilder.setValue(entry.getValue()); 3493 metricBuilder.addMetrics(pairBuilder.build()); 3494 } 3495 3496 builder.setScanMetrics(metricBuilder.build()); 3497 } 3498 } 3499 } finally { 3500 region.closeRegionOperation(); 3501 // Update serverside metrics, even on error. 3502 long end = EnvironmentEdgeManager.currentTime(); 3503 3504 long responseCellSize = 0; 3505 long blockBytesScanned = 0; 3506 if (rpcCall != null) { 3507 responseCellSize = rpcCall.getResponseCellSize(); 3508 blockBytesScanned = rpcCall.getBlockBytesScanned(); 3509 rsh.updateBlockBytesScanned(blockBytesScanned); 3510 } 3511 region.getMetrics().updateScan(); 3512 final MetricsRegionServer metricsRegionServer = server.getMetrics(); 3513 if (metricsRegionServer != null) { 3514 metricsRegionServer.updateScan(region, end - before, responseCellSize, blockBytesScanned); 3515 metricsRegionServer.updateReadQueryMeter(region, numOfNextRawCalls); 3516 } 3517 } 3518 // coprocessor postNext hook 3519 if (region.getCoprocessorHost() != null) { 3520 region.getCoprocessorHost().postScannerNext(scanner, results, maxResults, true); 3521 } 3522 } 3523 3524 /** 3525 * Scan data in a table. 3526 * @param controller the RPC controller 3527 * @param request the scan request 3528 */ 3529 @Override 3530 public ScanResponse scan(final RpcController controller, final ScanRequest request) 3531 throws ServiceException { 3532 if (controller != null && !(controller instanceof HBaseRpcController)) { 3533 throw new UnsupportedOperationException( 3534 "We only do " + "HBaseRpcControllers! FIX IF A PROBLEM: " + controller); 3535 } 3536 if (!request.hasScannerId() && !request.hasScan()) { 3537 throw new ServiceException( 3538 new DoNotRetryIOException("Missing required input: scannerId or scan")); 3539 } 3540 try { 3541 checkOpen(); 3542 } catch (IOException e) { 3543 if (request.hasScannerId()) { 3544 String scannerName = toScannerName(request.getScannerId()); 3545 if (LOG.isDebugEnabled()) { 3546 LOG.debug( 3547 "Server shutting down and client tried to access missing scanner " + scannerName); 3548 } 3549 final LeaseManager leaseManager = server.getLeaseManager(); 3550 if (leaseManager != null) { 3551 try { 3552 leaseManager.cancelLease(scannerName); 3553 } catch (LeaseException le) { 3554 // No problem, ignore 3555 if (LOG.isTraceEnabled()) { 3556 LOG.trace("Un-able to cancel lease of scanner. It could already be closed."); 3557 } 3558 } 3559 } 3560 } 3561 throw new ServiceException(e); 3562 } 3563 requestCount.increment(); 3564 rpcScanRequestCount.increment(); 3565 RegionScannerHolder rsh; 3566 ScanResponse.Builder builder = ScanResponse.newBuilder(); 3567 String scannerName; 3568 try { 3569 if (request.hasScannerId()) { 3570 // The downstream projects such as AsyncHBase in OpenTSDB need this value. See HBASE-18000 3571 // for more details. 3572 long scannerId = request.getScannerId(); 3573 builder.setScannerId(scannerId); 3574 scannerName = toScannerName(scannerId); 3575 rsh = getRegionScanner(request); 3576 } else { 3577 Pair<String, RegionScannerHolder> scannerNameAndRSH = newRegionScanner(request, builder); 3578 scannerName = scannerNameAndRSH.getFirst(); 3579 rsh = scannerNameAndRSH.getSecond(); 3580 } 3581 } catch (IOException e) { 3582 if (e == SCANNER_ALREADY_CLOSED) { 3583 // Now we will close scanner automatically if there are no more results for this region but 3584 // the old client will still send a close request to us. Just ignore it and return. 3585 return builder.build(); 3586 } 3587 throw new ServiceException(e); 3588 } 3589 if (rsh.fullRegionScan) { 3590 rpcFullScanRequestCount.increment(); 3591 } 3592 HRegion region = rsh.r; 3593 LeaseManager.Lease lease; 3594 try { 3595 // Remove lease while its being processed in server; protects against case 3596 // where processing of request takes > lease expiration time. or null if none found. 3597 lease = server.getLeaseManager().removeLease(scannerName); 3598 } catch (LeaseException e) { 3599 throw new ServiceException(e); 3600 } 3601 if (request.hasRenew() && request.getRenew()) { 3602 // add back and return 3603 addScannerLeaseBack(lease); 3604 try { 3605 checkScanNextCallSeq(request, rsh); 3606 } catch (OutOfOrderScannerNextException e) { 3607 throw new ServiceException(e); 3608 } 3609 return builder.build(); 3610 } 3611 OperationQuota quota; 3612 try { 3613 quota = getRpcQuotaManager().checkScanQuota(region, request, maxScannerResultSize, 3614 rsh.getMaxBlockBytesScanned(), rsh.getPrevBlockBytesScannedDifference()); 3615 } catch (IOException e) { 3616 addScannerLeaseBack(lease); 3617 throw new ServiceException(e); 3618 } 3619 try { 3620 checkScanNextCallSeq(request, rsh); 3621 } catch (OutOfOrderScannerNextException e) { 3622 addScannerLeaseBack(lease); 3623 throw new ServiceException(e); 3624 } 3625 // Now we have increased the next call sequence. If we give client an error, the retry will 3626 // never success. So we'd better close the scanner and return a DoNotRetryIOException to client 3627 // and then client will try to open a new scanner. 3628 boolean closeScanner = request.hasCloseScanner() ? request.getCloseScanner() : false; 3629 int rows; // this is scan.getCaching 3630 if (request.hasNumberOfRows()) { 3631 rows = request.getNumberOfRows(); 3632 } else { 3633 rows = closeScanner ? 0 : 1; 3634 } 3635 RpcCall rpcCall = RpcServer.getCurrentCall().orElse(null); 3636 // now let's do the real scan. 3637 long maxQuotaResultSize = Math.min(maxScannerResultSize, quota.getMaxResultSize()); 3638 RegionScanner scanner = rsh.s; 3639 // this is the limit of rows for this scan, if we the number of rows reach this value, we will 3640 // close the scanner. 3641 int limitOfRows; 3642 if (request.hasLimitOfRows()) { 3643 limitOfRows = request.getLimitOfRows(); 3644 } else { 3645 limitOfRows = -1; 3646 } 3647 boolean scannerClosed = false; 3648 try { 3649 List<Result> results = new ArrayList<>(Math.min(rows, 512)); 3650 if (rows > 0) { 3651 boolean done = false; 3652 // Call coprocessor. Get region info from scanner. 3653 if (region.getCoprocessorHost() != null) { 3654 Boolean bypass = region.getCoprocessorHost().preScannerNext(scanner, results, rows); 3655 if (!results.isEmpty()) { 3656 for (Result r : results) { 3657 // add cell size from CP results so we can track response size and update limits 3658 // when calling scan below if !done. We'll also have tracked block size if the CP 3659 // got results from hbase, since StoreScanner tracks that for all calls automatically. 3660 addSize(rpcCall, r); 3661 } 3662 } 3663 if (bypass != null && bypass.booleanValue()) { 3664 done = true; 3665 } 3666 } 3667 if (!done) { 3668 scan((HBaseRpcController) controller, request, rsh, maxQuotaResultSize, rows, limitOfRows, 3669 results, builder, rpcCall); 3670 } else { 3671 builder.setMoreResultsInRegion(!results.isEmpty()); 3672 } 3673 } else { 3674 // This is a open scanner call with numberOfRow = 0, so set more results in region to true. 3675 builder.setMoreResultsInRegion(true); 3676 } 3677 3678 quota.addScanResult(results); 3679 addResults(builder, results, (HBaseRpcController) controller, 3680 RegionReplicaUtil.isDefaultReplica(region.getRegionInfo()), 3681 isClientCellBlockSupport(rpcCall)); 3682 if (scanner.isFilterDone() && results.isEmpty()) { 3683 // If the scanner's filter - if any - is done with the scan 3684 // only set moreResults to false if the results is empty. This is used to keep compatible 3685 // with the old scan implementation where we just ignore the returned results if moreResults 3686 // is false. Can remove the isEmpty check after we get rid of the old implementation. 3687 builder.setMoreResults(false); 3688 } 3689 // Later we may close the scanner depending on this flag so here we need to make sure that we 3690 // have already set this flag. 3691 assert builder.hasMoreResultsInRegion(); 3692 // we only set moreResults to false in the above code, so set it to true if we haven't set it 3693 // yet. 3694 if (!builder.hasMoreResults()) { 3695 builder.setMoreResults(true); 3696 } 3697 if (builder.getMoreResults() && builder.getMoreResultsInRegion() && !results.isEmpty()) { 3698 // Record the last cell of the last result if it is a partial result 3699 // We need this to calculate the complete rows we have returned to client as the 3700 // mayHaveMoreCellsInRow is true does not mean that there will be extra cells for the 3701 // current row. We may filter out all the remaining cells for the current row and just 3702 // return the cells of the nextRow when calling RegionScanner.nextRaw. So here we need to 3703 // check for row change. 3704 Result lastResult = results.get(results.size() - 1); 3705 if (lastResult.mayHaveMoreCellsInRow()) { 3706 rsh.rowOfLastPartialResult = lastResult.getRow(); 3707 } else { 3708 rsh.rowOfLastPartialResult = null; 3709 } 3710 } 3711 if (!builder.getMoreResults() || !builder.getMoreResultsInRegion() || closeScanner) { 3712 scannerClosed = true; 3713 closeScanner(region, scanner, scannerName, rpcCall); 3714 } 3715 3716 // There's no point returning to a timed out client. Throwing ensures scanner is closed 3717 if (rpcCall != null && EnvironmentEdgeManager.currentTime() > rpcCall.getDeadline()) { 3718 throw new TimeoutIOException("Client deadline exceeded, cannot return results"); 3719 } 3720 3721 return builder.build(); 3722 } catch (IOException e) { 3723 try { 3724 // scanner is closed here 3725 scannerClosed = true; 3726 // The scanner state might be left in a dirty state, so we will tell the Client to 3727 // fail this RPC and close the scanner while opening up another one from the start of 3728 // row that the client has last seen. 3729 closeScanner(region, scanner, scannerName, rpcCall); 3730 3731 // If it is a DoNotRetryIOException already, throw as it is. Unfortunately, DNRIOE is 3732 // used in two different semantics. 3733 // (1) The first is to close the client scanner and bubble up the exception all the way 3734 // to the application. This is preferred when the exception is really un-recoverable 3735 // (like CorruptHFileException, etc). Plain DoNotRetryIOException also falls into this 3736 // bucket usually. 3737 // (2) Second semantics is to close the current region scanner only, but continue the 3738 // client scanner by overriding the exception. This is usually UnknownScannerException, 3739 // OutOfOrderScannerNextException, etc where the region scanner has to be closed, but the 3740 // application-level ClientScanner has to continue without bubbling up the exception to 3741 // the client. See ClientScanner code to see how it deals with these special exceptions. 3742 if (e instanceof DoNotRetryIOException) { 3743 throw e; 3744 } 3745 3746 // If it is a FileNotFoundException, wrap as a 3747 // DoNotRetryIOException. This can avoid the retry in ClientScanner. 3748 if (e instanceof FileNotFoundException) { 3749 throw new DoNotRetryIOException(e); 3750 } 3751 3752 // We closed the scanner already. Instead of throwing the IOException, and client 3753 // retrying with the same scannerId only to get USE on the next RPC, we directly throw 3754 // a special exception to save an RPC. 3755 if (VersionInfoUtil.hasMinimumVersion(rpcCall.getClientVersionInfo(), 1, 4)) { 3756 // 1.4.0+ clients know how to handle 3757 throw new ScannerResetException("Scanner is closed on the server-side", e); 3758 } else { 3759 // older clients do not know about SRE. Just throw USE, which they will handle 3760 throw new UnknownScannerException("Throwing UnknownScannerException to reset the client" 3761 + " scanner state for clients older than 1.3.", e); 3762 } 3763 } catch (IOException ioe) { 3764 throw new ServiceException(ioe); 3765 } 3766 } finally { 3767 if (!scannerClosed) { 3768 // Adding resets expiration time on lease. 3769 // the closeCallBack will be set in closeScanner so here we only care about shippedCallback 3770 if (rpcCall != null) { 3771 rpcCall.setCallBack(rsh.shippedCallback); 3772 } else { 3773 // If context is null,here we call rsh.shippedCallback directly to reuse the logic in 3774 // rsh.shippedCallback to release the internal resources in rsh,and lease is also added 3775 // back to regionserver's LeaseManager in rsh.shippedCallback. 3776 runShippedCallback(rsh); 3777 } 3778 } 3779 quota.close(); 3780 } 3781 } 3782 3783 private void runShippedCallback(RegionScannerHolder rsh) throws ServiceException { 3784 assert rsh.shippedCallback != null; 3785 try { 3786 rsh.shippedCallback.run(); 3787 } catch (IOException ioe) { 3788 throw new ServiceException(ioe); 3789 } 3790 } 3791 3792 private void closeScanner(HRegion region, RegionScanner scanner, String scannerName, 3793 RpcCallContext context) throws IOException { 3794 if (region.getCoprocessorHost() != null) { 3795 if (region.getCoprocessorHost().preScannerClose(scanner)) { 3796 // bypass the actual close. 3797 return; 3798 } 3799 } 3800 RegionScannerHolder rsh = scanners.remove(scannerName); 3801 if (rsh != null) { 3802 if (context != null) { 3803 context.setCallBack(rsh.closeCallBack); 3804 } else { 3805 rsh.s.close(); 3806 } 3807 if (region.getCoprocessorHost() != null) { 3808 region.getCoprocessorHost().postScannerClose(scanner); 3809 } 3810 closedScanners.put(scannerName, scannerName); 3811 } 3812 } 3813 3814 @Override 3815 public CoprocessorServiceResponse execRegionServerService(RpcController controller, 3816 CoprocessorServiceRequest request) throws ServiceException { 3817 rpcPreCheck("execRegionServerService"); 3818 return server.execRegionServerService(controller, request); 3819 } 3820 3821 @Override 3822 public GetSpaceQuotaSnapshotsResponse getSpaceQuotaSnapshots(RpcController controller, 3823 GetSpaceQuotaSnapshotsRequest request) throws ServiceException { 3824 try { 3825 final RegionServerSpaceQuotaManager manager = server.getRegionServerSpaceQuotaManager(); 3826 final GetSpaceQuotaSnapshotsResponse.Builder builder = 3827 GetSpaceQuotaSnapshotsResponse.newBuilder(); 3828 if (manager != null) { 3829 final Map<TableName, SpaceQuotaSnapshot> snapshots = manager.copyQuotaSnapshots(); 3830 for (Entry<TableName, SpaceQuotaSnapshot> snapshot : snapshots.entrySet()) { 3831 builder.addSnapshots(TableQuotaSnapshot.newBuilder() 3832 .setTableName(ProtobufUtil.toProtoTableName(snapshot.getKey())) 3833 .setSnapshot(SpaceQuotaSnapshot.toProtoSnapshot(snapshot.getValue())).build()); 3834 } 3835 } 3836 return builder.build(); 3837 } catch (Exception e) { 3838 throw new ServiceException(e); 3839 } 3840 } 3841 3842 @Override 3843 public ClearRegionBlockCacheResponse clearRegionBlockCache(RpcController controller, 3844 ClearRegionBlockCacheRequest request) throws ServiceException { 3845 try { 3846 rpcPreCheck("clearRegionBlockCache"); 3847 ClearRegionBlockCacheResponse.Builder builder = ClearRegionBlockCacheResponse.newBuilder(); 3848 CacheEvictionStatsBuilder stats = CacheEvictionStats.builder(); 3849 server.getRegionServerCoprocessorHost().preClearRegionBlockCache(); 3850 List<HRegion> regions = getRegions(request.getRegionList(), stats); 3851 for (HRegion region : regions) { 3852 try { 3853 stats = stats.append(this.server.clearRegionBlockCache(region)); 3854 } catch (Exception e) { 3855 stats.addException(region.getRegionInfo().getRegionName(), e); 3856 } 3857 } 3858 stats.withMaxCacheSize(server.getBlockCache().map(BlockCache::getMaxSize).orElse(0L)); 3859 server.getRegionServerCoprocessorHost().postClearRegionBlockCache(stats.build()); 3860 return builder.setStats(ProtobufUtil.toCacheEvictionStats(stats.build())).build(); 3861 } catch (IOException e) { 3862 throw new ServiceException(e); 3863 } 3864 } 3865 3866 private void executeOpenRegionProcedures(OpenRegionRequest request, 3867 Map<TableName, TableDescriptor> tdCache) { 3868 long masterSystemTime = request.hasMasterSystemTime() ? request.getMasterSystemTime() : -1; 3869 for (RegionOpenInfo regionOpenInfo : request.getOpenInfoList()) { 3870 RegionInfo regionInfo = ProtobufUtil.toRegionInfo(regionOpenInfo.getRegion()); 3871 TableName tableName = regionInfo.getTable(); 3872 TableDescriptor tableDesc = tdCache.get(tableName); 3873 if (tableDesc == null) { 3874 try { 3875 tableDesc = server.getTableDescriptors().get(regionInfo.getTable()); 3876 } catch (IOException e) { 3877 // Here we do not fail the whole method since we also need deal with other 3878 // procedures, and we can not ignore this one, so we still schedule a 3879 // AssignRegionHandler and it will report back to master if we still can not get the 3880 // TableDescriptor. 3881 LOG.warn("Failed to get TableDescriptor of {}, will try again in the handler", 3882 regionInfo.getTable(), e); 3883 } 3884 if (tableDesc != null) { 3885 tdCache.put(tableName, tableDesc); 3886 } 3887 } 3888 if (regionOpenInfo.getFavoredNodesCount() > 0) { 3889 server.updateRegionFavoredNodesMapping(regionInfo.getEncodedName(), 3890 regionOpenInfo.getFavoredNodesList()); 3891 } 3892 long procId = regionOpenInfo.getOpenProcId(); 3893 if (server.submitRegionProcedure(procId)) { 3894 server.getExecutorService().submit( 3895 AssignRegionHandler.create(server, regionInfo, procId, tableDesc, masterSystemTime)); 3896 } 3897 } 3898 } 3899 3900 private void executeCloseRegionProcedures(CloseRegionRequest request) { 3901 String encodedName; 3902 try { 3903 encodedName = ProtobufUtil.getRegionEncodedName(request.getRegion()); 3904 } catch (DoNotRetryIOException e) { 3905 throw new UncheckedIOException("Should not happen", e); 3906 } 3907 ServerName destination = request.hasDestinationServer() 3908 ? ProtobufUtil.toServerName(request.getDestinationServer()) 3909 : null; 3910 long procId = request.getCloseProcId(); 3911 boolean evictCache = request.getEvictCache(); 3912 if (server.submitRegionProcedure(procId)) { 3913 server.getExecutorService().submit( 3914 UnassignRegionHandler.create(server, encodedName, procId, false, destination, evictCache)); 3915 } 3916 } 3917 3918 private void executeProcedures(RemoteProcedureRequest request) { 3919 RSProcedureCallable callable; 3920 try { 3921 callable = Class.forName(request.getProcClass()).asSubclass(RSProcedureCallable.class) 3922 .getDeclaredConstructor().newInstance(); 3923 } catch (Exception e) { 3924 LOG.warn("Failed to instantiating remote procedure {}, pid={}", request.getProcClass(), 3925 request.getProcId(), e); 3926 server.remoteProcedureComplete(request.getProcId(), e); 3927 return; 3928 } 3929 callable.init(request.getProcData().toByteArray(), server); 3930 LOG.debug("Executing remote procedure {}, pid={}", callable.getClass(), request.getProcId()); 3931 server.executeProcedure(request.getProcId(), callable); 3932 } 3933 3934 @Override 3935 @QosPriority(priority = HConstants.ADMIN_QOS) 3936 public ExecuteProceduresResponse executeProcedures(RpcController controller, 3937 ExecuteProceduresRequest request) throws ServiceException { 3938 try { 3939 checkOpen(); 3940 throwOnWrongStartCode(request); 3941 server.getRegionServerCoprocessorHost().preExecuteProcedures(); 3942 if (request.getOpenRegionCount() > 0) { 3943 // Avoid reading from the TableDescritor every time(usually it will read from the file 3944 // system) 3945 Map<TableName, TableDescriptor> tdCache = new HashMap<>(); 3946 request.getOpenRegionList().forEach(req -> executeOpenRegionProcedures(req, tdCache)); 3947 } 3948 if (request.getCloseRegionCount() > 0) { 3949 request.getCloseRegionList().forEach(this::executeCloseRegionProcedures); 3950 } 3951 if (request.getProcCount() > 0) { 3952 request.getProcList().forEach(this::executeProcedures); 3953 } 3954 server.getRegionServerCoprocessorHost().postExecuteProcedures(); 3955 return ExecuteProceduresResponse.getDefaultInstance(); 3956 } catch (IOException e) { 3957 throw new ServiceException(e); 3958 } 3959 } 3960 3961 @Override 3962 public GetAllBootstrapNodesResponse getAllBootstrapNodes(RpcController controller, 3963 GetAllBootstrapNodesRequest request) throws ServiceException { 3964 GetAllBootstrapNodesResponse.Builder builder = GetAllBootstrapNodesResponse.newBuilder(); 3965 server.getBootstrapNodes() 3966 .forEachRemaining(server -> builder.addNode(ProtobufUtil.toServerName(server))); 3967 return builder.build(); 3968 } 3969 3970 private void setReloadableGuardrails(Configuration conf) { 3971 rowSizeWarnThreshold = 3972 conf.getInt(HConstants.BATCH_ROWS_THRESHOLD_NAME, HConstants.BATCH_ROWS_THRESHOLD_DEFAULT); 3973 rejectRowsWithSizeOverThreshold = 3974 conf.getBoolean(REJECT_BATCH_ROWS_OVER_THRESHOLD, DEFAULT_REJECT_BATCH_ROWS_OVER_THRESHOLD); 3975 maxScannerResultSize = conf.getLong(HConstants.HBASE_SERVER_SCANNER_MAX_RESULT_SIZE_KEY, 3976 HConstants.DEFAULT_HBASE_SERVER_SCANNER_MAX_RESULT_SIZE); 3977 } 3978 3979 @Override 3980 public void onConfigurationChange(Configuration conf) { 3981 super.onConfigurationChange(conf); 3982 setReloadableGuardrails(conf); 3983 } 3984 3985 @Override 3986 public GetCachedFilesListResponse getCachedFilesList(RpcController controller, 3987 GetCachedFilesListRequest request) throws ServiceException { 3988 GetCachedFilesListResponse.Builder responseBuilder = GetCachedFilesListResponse.newBuilder(); 3989 List<String> fullyCachedFiles = new ArrayList<>(); 3990 server.getBlockCache().flatMap(BlockCache::getFullyCachedFiles).ifPresent(fcf -> { 3991 fullyCachedFiles.addAll(fcf.keySet()); 3992 }); 3993 return responseBuilder.addAllCachedFiles(fullyCachedFiles).build(); 3994 } 3995}