001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019 020import com.google.errorprone.annotations.RestrictedApi; 021import java.io.FileNotFoundException; 022import java.io.IOException; 023import java.io.UncheckedIOException; 024import java.net.BindException; 025import java.net.InetAddress; 026import java.net.InetSocketAddress; 027import java.util.ArrayList; 028import java.util.Arrays; 029import java.util.Collections; 030import java.util.HashMap; 031import java.util.Iterator; 032import java.util.List; 033import java.util.Map; 034import java.util.Map.Entry; 035import java.util.NavigableMap; 036import java.util.Set; 037import java.util.TreeSet; 038import java.util.concurrent.ConcurrentHashMap; 039import java.util.concurrent.ConcurrentMap; 040import java.util.concurrent.TimeUnit; 041import java.util.concurrent.atomic.AtomicBoolean; 042import java.util.concurrent.atomic.AtomicLong; 043import java.util.concurrent.atomic.LongAdder; 044import org.apache.hadoop.conf.Configuration; 045import org.apache.hadoop.fs.FileSystem; 046import org.apache.hadoop.fs.Path; 047import org.apache.hadoop.hbase.CacheEvictionStats; 048import org.apache.hadoop.hbase.CacheEvictionStatsBuilder; 049import org.apache.hadoop.hbase.Cell; 050import org.apache.hadoop.hbase.CellScanner; 051import org.apache.hadoop.hbase.CellUtil; 052import org.apache.hadoop.hbase.DoNotRetryIOException; 053import org.apache.hadoop.hbase.DroppedSnapshotException; 054import org.apache.hadoop.hbase.ExtendedCellScannable; 055import org.apache.hadoop.hbase.ExtendedCellScanner; 056import org.apache.hadoop.hbase.HBaseIOException; 057import org.apache.hadoop.hbase.HBaseRpcServicesBase; 058import org.apache.hadoop.hbase.HConstants; 059import org.apache.hadoop.hbase.MultiActionResultTooLarge; 060import org.apache.hadoop.hbase.NotServingRegionException; 061import org.apache.hadoop.hbase.PrivateCellUtil; 062import org.apache.hadoop.hbase.RegionTooBusyException; 063import org.apache.hadoop.hbase.Server; 064import org.apache.hadoop.hbase.ServerName; 065import org.apache.hadoop.hbase.TableName; 066import org.apache.hadoop.hbase.UnknownScannerException; 067import org.apache.hadoop.hbase.client.Append; 068import org.apache.hadoop.hbase.client.CheckAndMutate; 069import org.apache.hadoop.hbase.client.CheckAndMutateResult; 070import org.apache.hadoop.hbase.client.ClientInternalHelper; 071import org.apache.hadoop.hbase.client.Delete; 072import org.apache.hadoop.hbase.client.Durability; 073import org.apache.hadoop.hbase.client.Get; 074import org.apache.hadoop.hbase.client.Increment; 075import org.apache.hadoop.hbase.client.Mutation; 076import org.apache.hadoop.hbase.client.OperationWithAttributes; 077import org.apache.hadoop.hbase.client.Put; 078import org.apache.hadoop.hbase.client.RegionInfo; 079import org.apache.hadoop.hbase.client.RegionReplicaUtil; 080import org.apache.hadoop.hbase.client.Result; 081import org.apache.hadoop.hbase.client.Row; 082import org.apache.hadoop.hbase.client.Scan; 083import org.apache.hadoop.hbase.client.TableDescriptor; 084import org.apache.hadoop.hbase.client.VersionInfoUtil; 085import org.apache.hadoop.hbase.exceptions.FailedSanityCheckException; 086import org.apache.hadoop.hbase.exceptions.OutOfOrderScannerNextException; 087import org.apache.hadoop.hbase.exceptions.ScannerResetException; 088import org.apache.hadoop.hbase.exceptions.TimeoutIOException; 089import org.apache.hadoop.hbase.exceptions.UnknownProtocolException; 090import org.apache.hadoop.hbase.io.ByteBuffAllocator; 091import org.apache.hadoop.hbase.io.hfile.BlockCache; 092import org.apache.hadoop.hbase.ipc.HBaseRpcController; 093import org.apache.hadoop.hbase.ipc.PriorityFunction; 094import org.apache.hadoop.hbase.ipc.QosPriority; 095import org.apache.hadoop.hbase.ipc.RpcCall; 096import org.apache.hadoop.hbase.ipc.RpcCallContext; 097import org.apache.hadoop.hbase.ipc.RpcCallback; 098import org.apache.hadoop.hbase.ipc.RpcServer; 099import org.apache.hadoop.hbase.ipc.RpcServer.BlockingServiceAndInterface; 100import org.apache.hadoop.hbase.ipc.RpcServerFactory; 101import org.apache.hadoop.hbase.ipc.RpcServerInterface; 102import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException; 103import org.apache.hadoop.hbase.ipc.ServerRpcController; 104import org.apache.hadoop.hbase.net.Address; 105import org.apache.hadoop.hbase.procedure2.RSProcedureCallable; 106import org.apache.hadoop.hbase.quotas.ActivePolicyEnforcement; 107import org.apache.hadoop.hbase.quotas.OperationQuota; 108import org.apache.hadoop.hbase.quotas.QuotaUtil; 109import org.apache.hadoop.hbase.quotas.RegionServerRpcQuotaManager; 110import org.apache.hadoop.hbase.quotas.RegionServerSpaceQuotaManager; 111import org.apache.hadoop.hbase.quotas.SpaceQuotaSnapshot; 112import org.apache.hadoop.hbase.quotas.SpaceViolationPolicyEnforcement; 113import org.apache.hadoop.hbase.regionserver.LeaseManager.Lease; 114import org.apache.hadoop.hbase.regionserver.LeaseManager.LeaseStillHeldException; 115import org.apache.hadoop.hbase.regionserver.Region.Operation; 116import org.apache.hadoop.hbase.regionserver.ScannerContext.LimitScope; 117import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker; 118import org.apache.hadoop.hbase.regionserver.handler.AssignRegionHandler; 119import org.apache.hadoop.hbase.regionserver.handler.OpenMetaHandler; 120import org.apache.hadoop.hbase.regionserver.handler.OpenPriorityRegionHandler; 121import org.apache.hadoop.hbase.regionserver.handler.OpenRegionHandler; 122import org.apache.hadoop.hbase.regionserver.handler.UnassignRegionHandler; 123import org.apache.hadoop.hbase.replication.ReplicationUtils; 124import org.apache.hadoop.hbase.replication.regionserver.RejectReplicationRequestStateChecker; 125import org.apache.hadoop.hbase.replication.regionserver.RejectRequestsFromClientStateChecker; 126import org.apache.hadoop.hbase.security.Superusers; 127import org.apache.hadoop.hbase.security.access.Permission; 128import org.apache.hadoop.hbase.util.Bytes; 129import org.apache.hadoop.hbase.util.DNS; 130import org.apache.hadoop.hbase.util.DNS.ServerType; 131import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 132import org.apache.hadoop.hbase.util.Pair; 133import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil; 134import org.apache.hadoop.hbase.wal.WAL; 135import org.apache.hadoop.hbase.wal.WALEdit; 136import org.apache.hadoop.hbase.wal.WALKey; 137import org.apache.hadoop.hbase.wal.WALSplitUtil; 138import org.apache.hadoop.hbase.wal.WALSplitUtil.MutationReplay; 139import org.apache.hadoop.hbase.zookeeper.ZKWatcher; 140import org.apache.yetus.audience.InterfaceAudience; 141import org.slf4j.Logger; 142import org.slf4j.LoggerFactory; 143 144import org.apache.hbase.thirdparty.com.google.common.cache.Cache; 145import org.apache.hbase.thirdparty.com.google.common.cache.CacheBuilder; 146import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList; 147import org.apache.hbase.thirdparty.com.google.common.collect.Lists; 148import org.apache.hbase.thirdparty.com.google.protobuf.ByteString; 149import org.apache.hbase.thirdparty.com.google.protobuf.Message; 150import org.apache.hbase.thirdparty.com.google.protobuf.RpcController; 151import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException; 152import org.apache.hbase.thirdparty.com.google.protobuf.TextFormat; 153import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations; 154import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils; 155 156import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 157import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter; 158import org.apache.hadoop.hbase.shaded.protobuf.ResponseConverter; 159import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService; 160import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearCompactionQueuesRequest; 161import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearCompactionQueuesResponse; 162import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearRegionBlockCacheRequest; 163import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearRegionBlockCacheResponse; 164import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CloseRegionRequest; 165import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CloseRegionResponse; 166import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactRegionRequest; 167import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactRegionResponse; 168import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactionSwitchRequest; 169import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactionSwitchResponse; 170import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ExecuteProceduresRequest; 171import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ExecuteProceduresResponse; 172import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionRequest; 173import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionResponse; 174import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetCachedFilesListRequest; 175import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetCachedFilesListResponse; 176import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetOnlineRegionRequest; 177import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetOnlineRegionResponse; 178import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoRequest; 179import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoResponse; 180import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionLoadRequest; 181import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionLoadResponse; 182import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetServerInfoRequest; 183import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetServerInfoResponse; 184import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetStoreFileRequest; 185import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetStoreFileResponse; 186import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionRequest; 187import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionRequest.RegionOpenInfo; 188import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionResponse; 189import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionResponse.RegionOpeningState; 190import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.RemoteProcedureRequest; 191import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ReplicateWALEntryRequest; 192import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ReplicateWALEntryResponse; 193import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.RollWALWriterRequest; 194import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.RollWALWriterResponse; 195import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.StopServerRequest; 196import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.StopServerResponse; 197import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateFavoredNodesRequest; 198import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateFavoredNodesResponse; 199import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WALEntry; 200import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WarmupRegionRequest; 201import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WarmupRegionResponse; 202import org.apache.hadoop.hbase.shaded.protobuf.generated.BootstrapNodeProtos.BootstrapNodeService; 203import org.apache.hadoop.hbase.shaded.protobuf.generated.BootstrapNodeProtos.GetAllBootstrapNodesRequest; 204import org.apache.hadoop.hbase.shaded.protobuf.generated.BootstrapNodeProtos.GetAllBootstrapNodesResponse; 205import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos; 206import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.Action; 207import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.BulkLoadHFileRequest; 208import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.BulkLoadHFileRequest.FamilyPath; 209import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.BulkLoadHFileResponse; 210import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CleanupBulkLoadRequest; 211import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CleanupBulkLoadResponse; 212import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService; 213import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.Condition; 214import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceRequest; 215import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceResponse; 216import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.GetRequest; 217import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.GetResponse; 218import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MultiRegionLoadStats; 219import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MultiRequest; 220import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MultiResponse; 221import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateRequest; 222import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateResponse; 223import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutationProto; 224import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutationProto.MutationType; 225import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.PrepareBulkLoadRequest; 226import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.PrepareBulkLoadResponse; 227import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.RegionAction; 228import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.RegionActionResult; 229import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ResultOrException; 230import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanRequest; 231import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanResponse; 232import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos; 233import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos.RegionLoad; 234import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.NameBytesPair; 235import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.NameInt64Pair; 236import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier; 237import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType; 238import org.apache.hadoop.hbase.shaded.protobuf.generated.MapReduceProtos.ScanMetrics; 239import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsRequest; 240import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsResponse; 241import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsResponse.TableQuotaSnapshot; 242import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.ClientMetaService; 243import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.BulkLoadDescriptor; 244import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.CompactionDescriptor; 245import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.FlushDescriptor; 246import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.RegionEventDescriptor; 247 248/** 249 * Implements the regionserver RPC services. 250 */ 251@InterfaceAudience.Private 252public class RSRpcServices extends HBaseRpcServicesBase<HRegionServer> 253 implements ClientService.BlockingInterface, BootstrapNodeService.BlockingInterface { 254 255 private static final Logger LOG = LoggerFactory.getLogger(RSRpcServices.class); 256 257 /** RPC scheduler to use for the region server. */ 258 public static final String REGION_SERVER_RPC_SCHEDULER_FACTORY_CLASS = 259 "hbase.region.server.rpc.scheduler.factory.class"; 260 261 /** 262 * Minimum allowable time limit delta (in milliseconds) that can be enforced during scans. This 263 * configuration exists to prevent the scenario where a time limit is specified to be so 264 * restrictive that the time limit is reached immediately (before any cells are scanned). 265 */ 266 private static final String REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA = 267 "hbase.region.server.rpc.minimum.scan.time.limit.delta"; 268 /** 269 * Default value of {@link RSRpcServices#REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA} 270 */ 271 static final long DEFAULT_REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA = 10; 272 273 /** 274 * Whether to reject rows with size > threshold defined by 275 * {@link HConstants#BATCH_ROWS_THRESHOLD_NAME} 276 */ 277 private static final String REJECT_BATCH_ROWS_OVER_THRESHOLD = 278 "hbase.rpc.rows.size.threshold.reject"; 279 280 /** 281 * Default value of config {@link RSRpcServices#REJECT_BATCH_ROWS_OVER_THRESHOLD} 282 */ 283 private static final boolean DEFAULT_REJECT_BATCH_ROWS_OVER_THRESHOLD = false; 284 285 // Request counter. (Includes requests that are not serviced by regions.) 286 // Count only once for requests with multiple actions like multi/caching-scan/replayBatch 287 final LongAdder requestCount = new LongAdder(); 288 289 // Request counter for rpc get 290 final LongAdder rpcGetRequestCount = new LongAdder(); 291 292 // Request counter for rpc scan 293 final LongAdder rpcScanRequestCount = new LongAdder(); 294 295 // Request counter for scans that might end up in full scans 296 final LongAdder rpcFullScanRequestCount = new LongAdder(); 297 298 // Request counter for rpc multi 299 final LongAdder rpcMultiRequestCount = new LongAdder(); 300 301 // Request counter for rpc mutate 302 final LongAdder rpcMutateRequestCount = new LongAdder(); 303 304 private volatile long maxScannerResultSize; 305 306 private ScannerIdGenerator scannerIdGenerator; 307 private final ConcurrentMap<String, RegionScannerHolder> scanners = new ConcurrentHashMap<>(); 308 // Hold the name and last sequence number of a closed scanner for a while. This is used 309 // to keep compatible for old clients which may send next or close request to a region 310 // scanner which has already been exhausted. The entries will be removed automatically 311 // after scannerLeaseTimeoutPeriod. 312 private final Cache<String, Long> closedScanners; 313 /** 314 * The lease timeout period for client scanners (milliseconds). 315 */ 316 private final int scannerLeaseTimeoutPeriod; 317 318 /** 319 * The RPC timeout period (milliseconds) 320 */ 321 private final int rpcTimeout; 322 323 /** 324 * The minimum allowable delta to use for the scan limit 325 */ 326 private final long minimumScanTimeLimitDelta; 327 328 /** 329 * Row size threshold for multi requests above which a warning is logged 330 */ 331 private volatile int rowSizeWarnThreshold; 332 /* 333 * Whether we should reject requests with very high no of rows i.e. beyond threshold defined by 334 * rowSizeWarnThreshold 335 */ 336 private volatile boolean rejectRowsWithSizeOverThreshold; 337 338 final AtomicBoolean clearCompactionQueues = new AtomicBoolean(false); 339 340 /** 341 * Services launched in RSRpcServices. By default they are on but you can use the below booleans 342 * to selectively enable/disable these services (Rare is the case where you would ever turn off 343 * one or the other). 344 */ 345 public static final String REGIONSERVER_ADMIN_SERVICE_CONFIG = 346 "hbase.regionserver.admin.executorService"; 347 public static final String REGIONSERVER_CLIENT_SERVICE_CONFIG = 348 "hbase.regionserver.client.executorService"; 349 public static final String REGIONSERVER_CLIENT_META_SERVICE_CONFIG = 350 "hbase.regionserver.client.meta.executorService"; 351 public static final String REGIONSERVER_BOOTSTRAP_NODES_SERVICE_CONFIG = 352 "hbase.regionserver.bootstrap.nodes.executorService"; 353 354 /** 355 * An Rpc callback for closing a RegionScanner. 356 */ 357 private static final class RegionScannerCloseCallBack implements RpcCallback { 358 359 private final RegionScanner scanner; 360 361 public RegionScannerCloseCallBack(RegionScanner scanner) { 362 this.scanner = scanner; 363 } 364 365 @Override 366 public void run() throws IOException { 367 this.scanner.close(); 368 } 369 } 370 371 /** 372 * An Rpc callback for doing shipped() call on a RegionScanner. 373 */ 374 private class RegionScannerShippedCallBack implements RpcCallback { 375 private final String scannerName; 376 private final Shipper shipper; 377 private final Lease lease; 378 379 public RegionScannerShippedCallBack(String scannerName, Shipper shipper, Lease lease) { 380 this.scannerName = scannerName; 381 this.shipper = shipper; 382 this.lease = lease; 383 } 384 385 @Override 386 public void run() throws IOException { 387 this.shipper.shipped(); 388 // We're done. On way out re-add the above removed lease. The lease was temp removed for this 389 // Rpc call and we are at end of the call now. Time to add it back. 390 if (scanners.containsKey(scannerName)) { 391 if (lease != null) { 392 server.getLeaseManager().addLease(lease); 393 } 394 } 395 } 396 } 397 398 /** 399 * An RpcCallBack that creates a list of scanners that needs to perform callBack operation on 400 * completion of multiGets. 401 */ 402 static class RegionScannersCloseCallBack implements RpcCallback { 403 private final List<RegionScanner> scanners = new ArrayList<>(); 404 405 public void addScanner(RegionScanner scanner) { 406 this.scanners.add(scanner); 407 } 408 409 @Override 410 public void run() { 411 for (RegionScanner scanner : scanners) { 412 try { 413 scanner.close(); 414 } catch (IOException e) { 415 LOG.error("Exception while closing the scanner " + scanner, e); 416 } 417 } 418 } 419 } 420 421 static class RegionScannerContext { 422 final String scannerName; 423 final RegionScannerHolder holder; 424 final OperationQuota quota; 425 426 RegionScannerContext(String scannerName, RegionScannerHolder holder, OperationQuota quota) { 427 this.scannerName = scannerName; 428 this.holder = holder; 429 this.quota = quota; 430 } 431 } 432 433 /** 434 * Holder class which holds the RegionScanner, nextCallSeq and RpcCallbacks together. 435 */ 436 static final class RegionScannerHolder { 437 private final AtomicLong nextCallSeq = new AtomicLong(0); 438 private final RegionScanner s; 439 private final HRegion r; 440 private final RpcCallback closeCallBack; 441 private final RpcCallback shippedCallback; 442 private byte[] rowOfLastPartialResult; 443 private boolean needCursor; 444 private boolean fullRegionScan; 445 private final String clientIPAndPort; 446 private final String userName; 447 private volatile long maxBlockBytesScanned = 0; 448 private volatile long prevBlockBytesScanned = 0; 449 private volatile long prevBlockBytesScannedDifference = 0; 450 451 RegionScannerHolder(RegionScanner s, HRegion r, RpcCallback closeCallBack, 452 RpcCallback shippedCallback, boolean needCursor, boolean fullRegionScan, 453 String clientIPAndPort, String userName) { 454 this.s = s; 455 this.r = r; 456 this.closeCallBack = closeCallBack; 457 this.shippedCallback = shippedCallback; 458 this.needCursor = needCursor; 459 this.fullRegionScan = fullRegionScan; 460 this.clientIPAndPort = clientIPAndPort; 461 this.userName = userName; 462 } 463 464 long getNextCallSeq() { 465 return nextCallSeq.get(); 466 } 467 468 boolean incNextCallSeq(long currentSeq) { 469 // Use CAS to prevent multiple scan request running on the same scanner. 470 return nextCallSeq.compareAndSet(currentSeq, currentSeq + 1); 471 } 472 473 long getMaxBlockBytesScanned() { 474 return maxBlockBytesScanned; 475 } 476 477 long getPrevBlockBytesScannedDifference() { 478 return prevBlockBytesScannedDifference; 479 } 480 481 void updateBlockBytesScanned(long blockBytesScanned) { 482 prevBlockBytesScannedDifference = blockBytesScanned - prevBlockBytesScanned; 483 prevBlockBytesScanned = blockBytesScanned; 484 if (blockBytesScanned > maxBlockBytesScanned) { 485 maxBlockBytesScanned = blockBytesScanned; 486 } 487 } 488 489 // Should be called only when we need to print lease expired messages otherwise 490 // cache the String once made. 491 @Override 492 public String toString() { 493 return "clientIPAndPort=" + this.clientIPAndPort + ", userName=" + this.userName 494 + ", regionInfo=" + this.r.getRegionInfo().getRegionNameAsString(); 495 } 496 } 497 498 /** 499 * Instantiated as a scanner lease. If the lease times out, the scanner is closed 500 */ 501 private class ScannerListener implements LeaseListener { 502 private final String scannerName; 503 504 ScannerListener(final String n) { 505 this.scannerName = n; 506 } 507 508 @Override 509 public void leaseExpired() { 510 RegionScannerHolder rsh = scanners.remove(this.scannerName); 511 if (rsh == null) { 512 LOG.warn("Scanner lease {} expired but no outstanding scanner", this.scannerName); 513 return; 514 } 515 LOG.info("Scanner lease {} expired {}", this.scannerName, rsh); 516 server.getMetrics().incrScannerLeaseExpired(); 517 RegionScanner s = rsh.s; 518 HRegion region = null; 519 try { 520 region = server.getRegion(s.getRegionInfo().getRegionName()); 521 if (region != null && region.getCoprocessorHost() != null) { 522 region.getCoprocessorHost().preScannerClose(s); 523 } 524 } catch (IOException e) { 525 LOG.error("Closing scanner {} {}", this.scannerName, rsh, e); 526 } finally { 527 try { 528 s.close(); 529 if (region != null && region.getCoprocessorHost() != null) { 530 region.getCoprocessorHost().postScannerClose(s); 531 } 532 } catch (IOException e) { 533 LOG.error("Closing scanner {} {}", this.scannerName, rsh, e); 534 } 535 } 536 } 537 } 538 539 private static ResultOrException getResultOrException(final ClientProtos.Result r, 540 final int index) { 541 return getResultOrException(ResponseConverter.buildActionResult(r), index); 542 } 543 544 private static ResultOrException getResultOrException(final Exception e, final int index) { 545 return getResultOrException(ResponseConverter.buildActionResult(e), index); 546 } 547 548 private static ResultOrException getResultOrException(final ResultOrException.Builder builder, 549 final int index) { 550 return builder.setIndex(index).build(); 551 } 552 553 /** 554 * Checks for the following pre-checks in order: 555 * <ol> 556 * <li>RegionServer is running</li> 557 * <li>If authorization is enabled, then RPC caller has ADMIN permissions</li> 558 * </ol> 559 * @param requestName name of rpc request. Used in reporting failures to provide context. 560 * @throws ServiceException If any of the above listed pre-check fails. 561 */ 562 private void rpcPreCheck(String requestName) throws ServiceException { 563 try { 564 checkOpen(); 565 requirePermission(requestName, Permission.Action.ADMIN); 566 } catch (IOException ioe) { 567 throw new ServiceException(ioe); 568 } 569 } 570 571 private boolean isClientCellBlockSupport(RpcCallContext context) { 572 return context != null && context.isClientCellBlockSupported(); 573 } 574 575 private void addResult(final MutateResponse.Builder builder, final Result result, 576 final HBaseRpcController rpcc, boolean clientCellBlockSupported) { 577 if (result == null) return; 578 if (clientCellBlockSupported) { 579 builder.setResult(ProtobufUtil.toResultNoData(result)); 580 rpcc.setCellScanner(result.cellScanner()); 581 } else { 582 ClientProtos.Result pbr = ProtobufUtil.toResult(result); 583 builder.setResult(pbr); 584 } 585 } 586 587 private void addResults(ScanResponse.Builder builder, List<Result> results, 588 HBaseRpcController controller, boolean isDefaultRegion, boolean clientCellBlockSupported) { 589 builder.setStale(!isDefaultRegion); 590 if (results.isEmpty()) { 591 return; 592 } 593 if (clientCellBlockSupported) { 594 for (Result res : results) { 595 builder.addCellsPerResult(res.size()); 596 builder.addPartialFlagPerResult(res.mayHaveMoreCellsInRow()); 597 } 598 controller.setCellScanner(PrivateCellUtil.createExtendedCellScanner(results)); 599 } else { 600 for (Result res : results) { 601 ClientProtos.Result pbr = ProtobufUtil.toResult(res); 602 builder.addResults(pbr); 603 } 604 } 605 } 606 607 private CheckAndMutateResult checkAndMutate(HRegion region, List<ClientProtos.Action> actions, 608 CellScanner cellScanner, Condition condition, long nonceGroup, 609 ActivePolicyEnforcement spaceQuotaEnforcement) throws IOException { 610 int countOfCompleteMutation = 0; 611 try { 612 if (!region.getRegionInfo().isMetaRegion()) { 613 server.getMemStoreFlusher().reclaimMemStoreMemory(); 614 } 615 List<Mutation> mutations = new ArrayList<>(); 616 long nonce = HConstants.NO_NONCE; 617 for (ClientProtos.Action action : actions) { 618 if (action.hasGet()) { 619 throw new DoNotRetryIOException( 620 "Atomic put and/or delete only, not a Get=" + action.getGet()); 621 } 622 MutationProto mutation = action.getMutation(); 623 MutationType type = mutation.getMutateType(); 624 switch (type) { 625 case PUT: 626 Put put = ProtobufUtil.toPut(mutation, cellScanner); 627 ++countOfCompleteMutation; 628 checkCellSizeLimit(region, put); 629 spaceQuotaEnforcement.getPolicyEnforcement(region).check(put); 630 mutations.add(put); 631 break; 632 case DELETE: 633 Delete del = ProtobufUtil.toDelete(mutation, cellScanner); 634 ++countOfCompleteMutation; 635 spaceQuotaEnforcement.getPolicyEnforcement(region).check(del); 636 mutations.add(del); 637 break; 638 case INCREMENT: 639 Increment increment = ProtobufUtil.toIncrement(mutation, cellScanner); 640 nonce = mutation.hasNonce() ? mutation.getNonce() : HConstants.NO_NONCE; 641 ++countOfCompleteMutation; 642 checkCellSizeLimit(region, increment); 643 spaceQuotaEnforcement.getPolicyEnforcement(region).check(increment); 644 mutations.add(increment); 645 break; 646 case APPEND: 647 Append append = ProtobufUtil.toAppend(mutation, cellScanner); 648 nonce = mutation.hasNonce() ? mutation.getNonce() : HConstants.NO_NONCE; 649 ++countOfCompleteMutation; 650 checkCellSizeLimit(region, append); 651 spaceQuotaEnforcement.getPolicyEnforcement(region).check(append); 652 mutations.add(append); 653 break; 654 default: 655 throw new DoNotRetryIOException("invalid mutation type : " + type); 656 } 657 } 658 659 if (mutations.size() == 0) { 660 return new CheckAndMutateResult(true, null); 661 } else { 662 CheckAndMutate checkAndMutate = ProtobufUtil.toCheckAndMutate(condition, mutations); 663 CheckAndMutateResult result = null; 664 if (region.getCoprocessorHost() != null) { 665 result = region.getCoprocessorHost().preCheckAndMutate(checkAndMutate); 666 } 667 if (result == null) { 668 result = region.checkAndMutate(checkAndMutate, nonceGroup, nonce); 669 if (region.getCoprocessorHost() != null) { 670 result = region.getCoprocessorHost().postCheckAndMutate(checkAndMutate, result); 671 } 672 } 673 return result; 674 } 675 } finally { 676 // Currently, the checkAndMutate isn't supported by batch so it won't mess up the cell scanner 677 // even if the malformed cells are not skipped. 678 for (int i = countOfCompleteMutation; i < actions.size(); ++i) { 679 skipCellsForMutation(actions.get(i), cellScanner); 680 } 681 } 682 } 683 684 /** 685 * Execute an append mutation. 686 * @return result to return to client if default operation should be bypassed as indicated by 687 * RegionObserver, null otherwise 688 */ 689 private Result append(final HRegion region, final OperationQuota quota, 690 final MutationProto mutation, final CellScanner cellScanner, long nonceGroup, 691 ActivePolicyEnforcement spaceQuota, RpcCallContext context) throws IOException { 692 long before = EnvironmentEdgeManager.currentTime(); 693 Append append = ProtobufUtil.toAppend(mutation, cellScanner); 694 checkCellSizeLimit(region, append); 695 spaceQuota.getPolicyEnforcement(region).check(append); 696 quota.addMutation(append); 697 long blockBytesScannedBefore = context != null ? context.getBlockBytesScanned() : 0; 698 long nonce = mutation.hasNonce() ? mutation.getNonce() : HConstants.NO_NONCE; 699 Result r = region.append(append, nonceGroup, nonce); 700 if (server.getMetrics() != null) { 701 long blockBytesScanned = 702 context != null ? context.getBlockBytesScanned() - blockBytesScannedBefore : 0; 703 server.getMetrics().updateAppend(region, EnvironmentEdgeManager.currentTime() - before, 704 blockBytesScanned); 705 } 706 return r == null ? Result.EMPTY_RESULT : r; 707 } 708 709 /** 710 * Execute an increment mutation. 711 */ 712 private Result increment(final HRegion region, final OperationQuota quota, 713 final MutationProto mutation, final CellScanner cells, long nonceGroup, 714 ActivePolicyEnforcement spaceQuota, RpcCallContext context) throws IOException { 715 long before = EnvironmentEdgeManager.currentTime(); 716 Increment increment = ProtobufUtil.toIncrement(mutation, cells); 717 checkCellSizeLimit(region, increment); 718 spaceQuota.getPolicyEnforcement(region).check(increment); 719 quota.addMutation(increment); 720 long blockBytesScannedBefore = context != null ? context.getBlockBytesScanned() : 0; 721 long nonce = mutation.hasNonce() ? mutation.getNonce() : HConstants.NO_NONCE; 722 Result r = region.increment(increment, nonceGroup, nonce); 723 final MetricsRegionServer metricsRegionServer = server.getMetrics(); 724 if (metricsRegionServer != null) { 725 long blockBytesScanned = 726 context != null ? context.getBlockBytesScanned() - blockBytesScannedBefore : 0; 727 metricsRegionServer.updateIncrement(region, EnvironmentEdgeManager.currentTime() - before, 728 blockBytesScanned); 729 } 730 return r == null ? Result.EMPTY_RESULT : r; 731 } 732 733 /** 734 * Run through the regionMutation <code>rm</code> and per Mutation, do the work, and then when 735 * done, add an instance of a {@link ResultOrException} that corresponds to each Mutation. 736 * @param cellsToReturn Could be null. May be allocated in this method. This is what this method 737 * returns as a 'result'. 738 * @param closeCallBack the callback to be used with multigets 739 * @param context the current RpcCallContext 740 * @return Return the <code>cellScanner</code> passed 741 */ 742 private List<ExtendedCellScannable> doNonAtomicRegionMutation(final HRegion region, 743 final OperationQuota quota, final RegionAction actions, final CellScanner cellScanner, 744 final RegionActionResult.Builder builder, List<ExtendedCellScannable> cellsToReturn, 745 long nonceGroup, final RegionScannersCloseCallBack closeCallBack, RpcCallContext context, 746 ActivePolicyEnforcement spaceQuotaEnforcement) { 747 // Gather up CONTIGUOUS Puts and Deletes in this mutations List. Idea is that rather than do 748 // one at a time, we instead pass them in batch. Be aware that the corresponding 749 // ResultOrException instance that matches each Put or Delete is then added down in the 750 // doNonAtomicBatchOp call. We should be staying aligned though the Put and Delete are 751 // deferred/batched 752 List<ClientProtos.Action> mutations = null; 753 long maxQuotaResultSize = Math.min(maxScannerResultSize, quota.getMaxResultSize()); 754 IOException sizeIOE = null; 755 ClientProtos.ResultOrException.Builder resultOrExceptionBuilder = 756 ResultOrException.newBuilder(); 757 boolean hasResultOrException = false; 758 for (ClientProtos.Action action : actions.getActionList()) { 759 hasResultOrException = false; 760 resultOrExceptionBuilder.clear(); 761 try { 762 Result r = null; 763 long blockBytesScannedBefore = context != null ? context.getBlockBytesScanned() : 0; 764 if ( 765 context != null && context.isRetryImmediatelySupported() 766 && (context.getResponseCellSize() > maxQuotaResultSize 767 || blockBytesScannedBefore + context.getResponseExceptionSize() > maxQuotaResultSize) 768 ) { 769 770 // We're storing the exception since the exception and reason string won't 771 // change after the response size limit is reached. 772 if (sizeIOE == null) { 773 // We don't need the stack un-winding do don't throw the exception. 774 // Throwing will kill the JVM's JIT. 775 // 776 // Instead just create the exception and then store it. 777 sizeIOE = new MultiActionResultTooLarge("Max size exceeded" + " CellSize: " 778 + context.getResponseCellSize() + " BlockSize: " + blockBytesScannedBefore); 779 780 // Only report the exception once since there's only one request that 781 // caused the exception. Otherwise this number will dominate the exceptions count. 782 rpcServer.getMetrics().exception(sizeIOE); 783 } 784 785 // Now that there's an exception is known to be created 786 // use it for the response. 787 // 788 // This will create a copy in the builder. 789 NameBytesPair pair = ResponseConverter.buildException(sizeIOE); 790 resultOrExceptionBuilder.setException(pair); 791 context.incrementResponseExceptionSize(pair.getSerializedSize()); 792 resultOrExceptionBuilder.setIndex(action.getIndex()); 793 builder.addResultOrException(resultOrExceptionBuilder.build()); 794 skipCellsForMutation(action, cellScanner); 795 continue; 796 } 797 if (action.hasGet()) { 798 long before = EnvironmentEdgeManager.currentTime(); 799 ClientProtos.Get pbGet = action.getGet(); 800 // An asynchbase client, https://github.com/OpenTSDB/asynchbase, starts by trying to do 801 // a get closest before. Throwing the UnknownProtocolException signals it that it needs 802 // to switch and do hbase2 protocol (HBase servers do not tell clients what versions 803 // they are; its a problem for non-native clients like asynchbase. HBASE-20225. 804 if (pbGet.hasClosestRowBefore() && pbGet.getClosestRowBefore()) { 805 throw new UnknownProtocolException("Is this a pre-hbase-1.0.0 or asynchbase client? " 806 + "Client is invoking getClosestRowBefore removed in hbase-2.0.0 replaced by " 807 + "reverse Scan."); 808 } 809 try { 810 Get get = ProtobufUtil.toGet(pbGet); 811 if (context != null) { 812 r = get(get, (region), closeCallBack, context); 813 } else { 814 r = region.get(get); 815 } 816 } finally { 817 final MetricsRegionServer metricsRegionServer = server.getMetrics(); 818 if (metricsRegionServer != null) { 819 long blockBytesScanned = 820 context != null ? context.getBlockBytesScanned() - blockBytesScannedBefore : 0; 821 metricsRegionServer.updateGet(region, EnvironmentEdgeManager.currentTime() - before, 822 blockBytesScanned); 823 } 824 } 825 } else if (action.hasServiceCall()) { 826 hasResultOrException = true; 827 Message result = execServiceOnRegion(region, action.getServiceCall()); 828 ClientProtos.CoprocessorServiceResult.Builder serviceResultBuilder = 829 ClientProtos.CoprocessorServiceResult.newBuilder(); 830 resultOrExceptionBuilder.setServiceResult(serviceResultBuilder 831 .setValue(serviceResultBuilder.getValueBuilder().setName(result.getClass().getName()) 832 // TODO: Copy!!! 833 .setValue(UnsafeByteOperations.unsafeWrap(result.toByteArray())))); 834 } else if (action.hasMutation()) { 835 MutationType type = action.getMutation().getMutateType(); 836 if ( 837 type != MutationType.PUT && type != MutationType.DELETE && mutations != null 838 && !mutations.isEmpty() 839 ) { 840 // Flush out any Puts or Deletes already collected. 841 doNonAtomicBatchOp(builder, region, quota, mutations, cellScanner, 842 spaceQuotaEnforcement); 843 mutations.clear(); 844 } 845 switch (type) { 846 case APPEND: 847 r = append(region, quota, action.getMutation(), cellScanner, nonceGroup, 848 spaceQuotaEnforcement, context); 849 break; 850 case INCREMENT: 851 r = increment(region, quota, action.getMutation(), cellScanner, nonceGroup, 852 spaceQuotaEnforcement, context); 853 break; 854 case PUT: 855 case DELETE: 856 // Collect the individual mutations and apply in a batch 857 if (mutations == null) { 858 mutations = new ArrayList<>(actions.getActionCount()); 859 } 860 mutations.add(action); 861 break; 862 default: 863 throw new DoNotRetryIOException("Unsupported mutate type: " + type.name()); 864 } 865 } else { 866 throw new HBaseIOException("Unexpected Action type"); 867 } 868 if (r != null) { 869 ClientProtos.Result pbResult = null; 870 if (isClientCellBlockSupport(context)) { 871 pbResult = ProtobufUtil.toResultNoData(r); 872 // Hard to guess the size here. Just make a rough guess. 873 if (cellsToReturn == null) { 874 cellsToReturn = new ArrayList<>(); 875 } 876 cellsToReturn.add(r); 877 } else { 878 pbResult = ProtobufUtil.toResult(r); 879 } 880 addSize(context, r); 881 hasResultOrException = true; 882 resultOrExceptionBuilder.setResult(pbResult); 883 } 884 // Could get to here and there was no result and no exception. Presumes we added 885 // a Put or Delete to the collecting Mutations List for adding later. In this 886 // case the corresponding ResultOrException instance for the Put or Delete will be added 887 // down in the doNonAtomicBatchOp method call rather than up here. 888 } catch (IOException ie) { 889 rpcServer.getMetrics().exception(ie); 890 hasResultOrException = true; 891 NameBytesPair pair = ResponseConverter.buildException(ie); 892 resultOrExceptionBuilder.setException(pair); 893 context.incrementResponseExceptionSize(pair.getSerializedSize()); 894 } 895 if (hasResultOrException) { 896 // Propagate index. 897 resultOrExceptionBuilder.setIndex(action.getIndex()); 898 builder.addResultOrException(resultOrExceptionBuilder.build()); 899 } 900 } 901 // Finish up any outstanding mutations 902 if (!CollectionUtils.isEmpty(mutations)) { 903 doNonAtomicBatchOp(builder, region, quota, mutations, cellScanner, spaceQuotaEnforcement); 904 } 905 return cellsToReturn; 906 } 907 908 private void checkCellSizeLimit(final HRegion r, final Mutation m) throws IOException { 909 if (r.maxCellSize > 0) { 910 CellScanner cells = m.cellScanner(); 911 while (cells.advance()) { 912 int size = PrivateCellUtil.estimatedSerializedSizeOf(cells.current()); 913 if (size > r.maxCellSize) { 914 String msg = "Cell[" + cells.current() + "] with size " + size + " exceeds limit of " 915 + r.maxCellSize + " bytes"; 916 LOG.debug(msg); 917 throw new DoNotRetryIOException(msg); 918 } 919 } 920 } 921 } 922 923 private void doAtomicBatchOp(final RegionActionResult.Builder builder, final HRegion region, 924 final OperationQuota quota, final List<ClientProtos.Action> mutations, final CellScanner cells, 925 long nonceGroup, ActivePolicyEnforcement spaceQuotaEnforcement) throws IOException { 926 // Just throw the exception. The exception will be caught and then added to region-level 927 // exception for RegionAction. Leaving the null to action result is ok since the null 928 // result is viewed as failure by hbase client. And the region-lever exception will be used 929 // to replaced the null result. see AsyncRequestFutureImpl#receiveMultiAction and 930 // AsyncBatchRpcRetryingCaller#onComplete for more details. 931 doBatchOp(builder, region, quota, mutations, cells, nonceGroup, spaceQuotaEnforcement, true); 932 } 933 934 private void doNonAtomicBatchOp(final RegionActionResult.Builder builder, final HRegion region, 935 final OperationQuota quota, final List<ClientProtos.Action> mutations, final CellScanner cells, 936 ActivePolicyEnforcement spaceQuotaEnforcement) { 937 try { 938 doBatchOp(builder, region, quota, mutations, cells, HConstants.NO_NONCE, 939 spaceQuotaEnforcement, false); 940 } catch (IOException e) { 941 // Set the exception for each action. The mutations in same RegionAction are group to 942 // different batch and then be processed individually. Hence, we don't set the region-level 943 // exception here for whole RegionAction. 944 for (Action mutation : mutations) { 945 builder.addResultOrException(getResultOrException(e, mutation.getIndex())); 946 } 947 } 948 } 949 950 /** 951 * Execute a list of mutations. 952 */ 953 private void doBatchOp(final RegionActionResult.Builder builder, final HRegion region, 954 final OperationQuota quota, final List<ClientProtos.Action> mutations, final CellScanner cells, 955 long nonceGroup, ActivePolicyEnforcement spaceQuotaEnforcement, boolean atomic) 956 throws IOException { 957 Mutation[] mArray = new Mutation[mutations.size()]; 958 long before = EnvironmentEdgeManager.currentTime(); 959 boolean batchContainsPuts = false, batchContainsDelete = false; 960 try { 961 /** 962 * HBASE-17924 mutationActionMap is a map to map the relation between mutations and actions 963 * since mutation array may have been reoredered.In order to return the right result or 964 * exception to the corresponding actions, We need to know which action is the mutation belong 965 * to. We can't sort ClientProtos.Action array, since they are bonded to cellscanners. 966 */ 967 Map<Mutation, ClientProtos.Action> mutationActionMap = new HashMap<>(); 968 int i = 0; 969 long nonce = HConstants.NO_NONCE; 970 for (ClientProtos.Action action : mutations) { 971 if (action.hasGet()) { 972 throw new DoNotRetryIOException( 973 "Atomic put and/or delete only, not a Get=" + action.getGet()); 974 } 975 MutationProto m = action.getMutation(); 976 Mutation mutation; 977 switch (m.getMutateType()) { 978 case PUT: 979 mutation = ProtobufUtil.toPut(m, cells); 980 batchContainsPuts = true; 981 break; 982 983 case DELETE: 984 mutation = ProtobufUtil.toDelete(m, cells); 985 batchContainsDelete = true; 986 break; 987 988 case INCREMENT: 989 mutation = ProtobufUtil.toIncrement(m, cells); 990 nonce = m.hasNonce() ? m.getNonce() : HConstants.NO_NONCE; 991 break; 992 993 case APPEND: 994 mutation = ProtobufUtil.toAppend(m, cells); 995 nonce = m.hasNonce() ? m.getNonce() : HConstants.NO_NONCE; 996 break; 997 998 default: 999 throw new DoNotRetryIOException("Invalid mutation type : " + m.getMutateType()); 1000 } 1001 mutationActionMap.put(mutation, action); 1002 mArray[i++] = mutation; 1003 checkCellSizeLimit(region, mutation); 1004 // Check if a space quota disallows this mutation 1005 spaceQuotaEnforcement.getPolicyEnforcement(region).check(mutation); 1006 quota.addMutation(mutation); 1007 } 1008 1009 if (!region.getRegionInfo().isMetaRegion()) { 1010 server.getMemStoreFlusher().reclaimMemStoreMemory(); 1011 } 1012 1013 // HBASE-17924 1014 // Sort to improve lock efficiency for non-atomic batch of operations. If atomic 1015 // order is preserved as its expected from the client 1016 if (!atomic) { 1017 Arrays.sort(mArray, (v1, v2) -> Row.COMPARATOR.compare(v1, v2)); 1018 } 1019 1020 OperationStatus[] codes = region.batchMutate(mArray, atomic, nonceGroup, nonce); 1021 1022 // When atomic is true, it indicates that the mutateRow API or the batch API with 1023 // RowMutations is called. In this case, we need to merge the results of the 1024 // Increment/Append operations if the mutations include those operations, and set the merged 1025 // result to the first element of the ResultOrException list 1026 if (atomic) { 1027 List<ResultOrException> resultOrExceptions = new ArrayList<>(); 1028 List<Result> results = new ArrayList<>(); 1029 for (i = 0; i < codes.length; i++) { 1030 if (codes[i].getResult() != null) { 1031 results.add(codes[i].getResult()); 1032 } 1033 if (i != 0) { 1034 resultOrExceptions 1035 .add(getResultOrException(ClientProtos.Result.getDefaultInstance(), i)); 1036 } 1037 } 1038 1039 if (results.isEmpty()) { 1040 builder.addResultOrException( 1041 getResultOrException(ClientProtos.Result.getDefaultInstance(), 0)); 1042 } else { 1043 // Merge the results of the Increment/Append operations 1044 List<Cell> cellList = new ArrayList<>(); 1045 for (Result result : results) { 1046 if (result.rawCells() != null) { 1047 cellList.addAll(Arrays.asList(result.rawCells())); 1048 } 1049 } 1050 Result result = Result.create(cellList); 1051 1052 // Set the merged result of the Increment/Append operations to the first element of the 1053 // ResultOrException list 1054 builder.addResultOrException(getResultOrException(ProtobufUtil.toResult(result), 0)); 1055 } 1056 1057 builder.addAllResultOrException(resultOrExceptions); 1058 return; 1059 } 1060 1061 for (i = 0; i < codes.length; i++) { 1062 Mutation currentMutation = mArray[i]; 1063 ClientProtos.Action currentAction = mutationActionMap.get(currentMutation); 1064 int index = currentAction.hasIndex() ? currentAction.getIndex() : i; 1065 Exception e; 1066 switch (codes[i].getOperationStatusCode()) { 1067 case BAD_FAMILY: 1068 e = new NoSuchColumnFamilyException(codes[i].getExceptionMsg()); 1069 builder.addResultOrException(getResultOrException(e, index)); 1070 break; 1071 1072 case SANITY_CHECK_FAILURE: 1073 e = new FailedSanityCheckException(codes[i].getExceptionMsg()); 1074 builder.addResultOrException(getResultOrException(e, index)); 1075 break; 1076 1077 default: 1078 e = new DoNotRetryIOException(codes[i].getExceptionMsg()); 1079 builder.addResultOrException(getResultOrException(e, index)); 1080 break; 1081 1082 case SUCCESS: 1083 ClientProtos.Result result = codes[i].getResult() == null 1084 ? ClientProtos.Result.getDefaultInstance() 1085 : ProtobufUtil.toResult(codes[i].getResult()); 1086 builder.addResultOrException(getResultOrException(result, index)); 1087 break; 1088 1089 case STORE_TOO_BUSY: 1090 e = new RegionTooBusyException(codes[i].getExceptionMsg()); 1091 builder.addResultOrException(getResultOrException(e, index)); 1092 break; 1093 } 1094 } 1095 } finally { 1096 int processedMutationIndex = 0; 1097 for (Action mutation : mutations) { 1098 // The non-null mArray[i] means the cell scanner has been read. 1099 if (mArray[processedMutationIndex++] == null) { 1100 skipCellsForMutation(mutation, cells); 1101 } 1102 } 1103 updateMutationMetrics(region, before, batchContainsPuts, batchContainsDelete); 1104 } 1105 } 1106 1107 private void updateMutationMetrics(HRegion region, long starttime, boolean batchContainsPuts, 1108 boolean batchContainsDelete) { 1109 final MetricsRegionServer metricsRegionServer = server.getMetrics(); 1110 if (metricsRegionServer != null) { 1111 long after = EnvironmentEdgeManager.currentTime(); 1112 if (batchContainsPuts) { 1113 metricsRegionServer.updatePutBatch(region, after - starttime); 1114 } 1115 if (batchContainsDelete) { 1116 metricsRegionServer.updateDeleteBatch(region, after - starttime); 1117 } 1118 } 1119 } 1120 1121 /** 1122 * Execute a list of Put/Delete mutations. The function returns OperationStatus instead of 1123 * constructing MultiResponse to save a possible loop if caller doesn't need MultiResponse. 1124 * @return an array of OperationStatus which internally contains the OperationStatusCode and the 1125 * exceptionMessage if any 1126 * @deprecated Since 3.0.0, will be removed in 4.0.0. We do not use this method for replaying 1127 * edits for secondary replicas any more, see 1128 * {@link #replicateToReplica(RpcController, ReplicateWALEntryRequest)}. 1129 */ 1130 @Deprecated 1131 private OperationStatus[] doReplayBatchOp(final HRegion region, 1132 final List<MutationReplay> mutations, long replaySeqId) throws IOException { 1133 long before = EnvironmentEdgeManager.currentTime(); 1134 boolean batchContainsPuts = false, batchContainsDelete = false; 1135 try { 1136 for (Iterator<MutationReplay> it = mutations.iterator(); it.hasNext();) { 1137 MutationReplay m = it.next(); 1138 1139 if (m.getType() == MutationType.PUT) { 1140 batchContainsPuts = true; 1141 } else { 1142 batchContainsDelete = true; 1143 } 1144 1145 NavigableMap<byte[], List<Cell>> map = m.mutation.getFamilyCellMap(); 1146 List<Cell> metaCells = map.get(WALEdit.METAFAMILY); 1147 if (metaCells != null && !metaCells.isEmpty()) { 1148 for (Cell metaCell : metaCells) { 1149 CompactionDescriptor compactionDesc = WALEdit.getCompaction(metaCell); 1150 boolean isDefaultReplica = RegionReplicaUtil.isDefaultReplica(region.getRegionInfo()); 1151 HRegion hRegion = region; 1152 if (compactionDesc != null) { 1153 // replay the compaction. Remove the files from stores only if we are the primary 1154 // region replica (thus own the files) 1155 hRegion.replayWALCompactionMarker(compactionDesc, !isDefaultReplica, isDefaultReplica, 1156 replaySeqId); 1157 continue; 1158 } 1159 FlushDescriptor flushDesc = WALEdit.getFlushDescriptor(metaCell); 1160 if (flushDesc != null && !isDefaultReplica) { 1161 hRegion.replayWALFlushMarker(flushDesc, replaySeqId); 1162 continue; 1163 } 1164 RegionEventDescriptor regionEvent = WALEdit.getRegionEventDescriptor(metaCell); 1165 if (regionEvent != null && !isDefaultReplica) { 1166 hRegion.replayWALRegionEventMarker(regionEvent); 1167 continue; 1168 } 1169 BulkLoadDescriptor bulkLoadEvent = WALEdit.getBulkLoadDescriptor(metaCell); 1170 if (bulkLoadEvent != null) { 1171 hRegion.replayWALBulkLoadEventMarker(bulkLoadEvent); 1172 continue; 1173 } 1174 } 1175 it.remove(); 1176 } 1177 } 1178 requestCount.increment(); 1179 if (!region.getRegionInfo().isMetaRegion()) { 1180 server.getMemStoreFlusher().reclaimMemStoreMemory(); 1181 } 1182 return region.batchReplay(mutations.toArray(new MutationReplay[mutations.size()]), 1183 replaySeqId); 1184 } finally { 1185 updateMutationMetrics(region, before, batchContainsPuts, batchContainsDelete); 1186 } 1187 } 1188 1189 private void closeAllScanners() { 1190 // Close any outstanding scanners. Means they'll get an UnknownScanner 1191 // exception next time they come in. 1192 for (Map.Entry<String, RegionScannerHolder> e : scanners.entrySet()) { 1193 try { 1194 e.getValue().s.close(); 1195 } catch (IOException ioe) { 1196 LOG.warn("Closing scanner " + e.getKey(), ioe); 1197 } 1198 } 1199 } 1200 1201 // Directly invoked only for testing 1202 public RSRpcServices(final HRegionServer rs) throws IOException { 1203 super(rs, rs.getProcessName()); 1204 final Configuration conf = rs.getConfiguration(); 1205 setReloadableGuardrails(conf); 1206 scannerLeaseTimeoutPeriod = conf.getInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, 1207 HConstants.DEFAULT_HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD); 1208 rpcTimeout = 1209 conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY, HConstants.DEFAULT_HBASE_RPC_TIMEOUT); 1210 minimumScanTimeLimitDelta = conf.getLong(REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA, 1211 DEFAULT_REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA); 1212 rpcServer.setNamedQueueRecorder(rs.getNamedQueueRecorder()); 1213 closedScanners = CacheBuilder.newBuilder() 1214 .expireAfterAccess(scannerLeaseTimeoutPeriod, TimeUnit.MILLISECONDS).build(); 1215 } 1216 1217 @Override 1218 protected boolean defaultReservoirEnabled() { 1219 return true; 1220 } 1221 1222 @Override 1223 protected ServerType getDNSServerType() { 1224 return DNS.ServerType.REGIONSERVER; 1225 } 1226 1227 @Override 1228 protected String getHostname(Configuration conf, String defaultHostname) { 1229 return conf.get("hbase.regionserver.ipc.address", defaultHostname); 1230 } 1231 1232 @Override 1233 protected String getPortConfigName() { 1234 return HConstants.REGIONSERVER_PORT; 1235 } 1236 1237 @Override 1238 protected int getDefaultPort() { 1239 return HConstants.DEFAULT_REGIONSERVER_PORT; 1240 } 1241 1242 @Override 1243 protected Class<?> getRpcSchedulerFactoryClass(Configuration conf) { 1244 return conf.getClass(REGION_SERVER_RPC_SCHEDULER_FACTORY_CLASS, 1245 SimpleRpcSchedulerFactory.class); 1246 } 1247 1248 protected RpcServerInterface createRpcServer(final Server server, 1249 final RpcSchedulerFactory rpcSchedulerFactory, final InetSocketAddress bindAddress, 1250 final String name) throws IOException { 1251 final Configuration conf = server.getConfiguration(); 1252 boolean reservoirEnabled = conf.getBoolean(ByteBuffAllocator.ALLOCATOR_POOL_ENABLED_KEY, true); 1253 try { 1254 return RpcServerFactory.createRpcServer(server, name, getServices(), bindAddress, // use final 1255 // bindAddress 1256 // for this 1257 // server. 1258 conf, rpcSchedulerFactory.create(conf, this, server), reservoirEnabled); 1259 } catch (BindException be) { 1260 throw new IOException(be.getMessage() + ". To switch ports use the '" 1261 + HConstants.REGIONSERVER_PORT + "' configuration property.", 1262 be.getCause() != null ? be.getCause() : be); 1263 } 1264 } 1265 1266 protected Class<?> getRpcSchedulerFactoryClass() { 1267 final Configuration conf = server.getConfiguration(); 1268 return conf.getClass(REGION_SERVER_RPC_SCHEDULER_FACTORY_CLASS, 1269 SimpleRpcSchedulerFactory.class); 1270 } 1271 1272 protected PriorityFunction createPriority() { 1273 return new RSAnnotationReadingPriorityFunction(this); 1274 } 1275 1276 public int getScannersCount() { 1277 return scanners.size(); 1278 } 1279 1280 /** Returns The outstanding RegionScanner for <code>scannerId</code> or null if none found. */ 1281 RegionScanner getScanner(long scannerId) { 1282 RegionScannerHolder rsh = checkQuotaAndGetRegionScannerContext(scannerId); 1283 return rsh == null ? null : rsh.s; 1284 } 1285 1286 /** Returns The associated RegionScannerHolder for <code>scannerId</code> or null. */ 1287 private RegionScannerHolder checkQuotaAndGetRegionScannerContext(long scannerId) { 1288 return scanners.get(toScannerName(scannerId)); 1289 } 1290 1291 public String getScanDetailsWithId(long scannerId) { 1292 RegionScanner scanner = getScanner(scannerId); 1293 if (scanner == null) { 1294 return null; 1295 } 1296 StringBuilder builder = new StringBuilder(); 1297 builder.append("table: ").append(scanner.getRegionInfo().getTable().getNameAsString()); 1298 builder.append(" region: ").append(scanner.getRegionInfo().getRegionNameAsString()); 1299 builder.append(" operation_id: ").append(scanner.getOperationId()); 1300 return builder.toString(); 1301 } 1302 1303 public String getScanDetailsWithRequest(ScanRequest request) { 1304 try { 1305 if (!request.hasRegion()) { 1306 return null; 1307 } 1308 Region region = getRegion(request.getRegion()); 1309 StringBuilder builder = new StringBuilder(); 1310 builder.append("table: ").append(region.getRegionInfo().getTable().getNameAsString()); 1311 builder.append(" region: ").append(region.getRegionInfo().getRegionNameAsString()); 1312 for (NameBytesPair pair : request.getScan().getAttributeList()) { 1313 if (OperationWithAttributes.ID_ATRIBUTE.equals(pair.getName())) { 1314 builder.append(" operation_id: ").append(Bytes.toString(pair.getValue().toByteArray())); 1315 break; 1316 } 1317 } 1318 return builder.toString(); 1319 } catch (IOException ignored) { 1320 return null; 1321 } 1322 } 1323 1324 /** 1325 * Get the vtime associated with the scanner. Currently the vtime is the number of "next" calls. 1326 */ 1327 long getScannerVirtualTime(long scannerId) { 1328 RegionScannerHolder rsh = checkQuotaAndGetRegionScannerContext(scannerId); 1329 return rsh == null ? 0L : rsh.getNextCallSeq(); 1330 } 1331 1332 /** 1333 * Method to account for the size of retained cells. 1334 * @param context rpc call context 1335 * @param r result to add size. 1336 * @return an object that represents the last referenced block from this response. 1337 */ 1338 void addSize(RpcCallContext context, Result r) { 1339 if (context != null && r != null && !r.isEmpty()) { 1340 for (Cell c : r.rawCells()) { 1341 context.incrementResponseCellSize(PrivateCellUtil.estimatedSerializedSizeOf(c)); 1342 } 1343 } 1344 } 1345 1346 /** Returns Remote client's ip and port else null if can't be determined. */ 1347 @RestrictedApi(explanation = "Should only be called in TestRSRpcServices and RSRpcServices", 1348 link = "", allowedOnPath = ".*(TestRSRpcServices|RSRpcServices).java") 1349 static String getRemoteClientIpAndPort() { 1350 RpcCall rpcCall = RpcServer.getCurrentCall().orElse(null); 1351 if (rpcCall == null) { 1352 return HConstants.EMPTY_STRING; 1353 } 1354 InetAddress address = rpcCall.getRemoteAddress(); 1355 if (address == null) { 1356 return HConstants.EMPTY_STRING; 1357 } 1358 // Be careful here with InetAddress. Do InetAddress#getHostAddress. It will not do a name 1359 // resolution. Just use the IP. It is generally a smaller amount of info to keep around while 1360 // scanning than a hostname anyways. 1361 return Address.fromParts(address.getHostAddress(), rpcCall.getRemotePort()).toString(); 1362 } 1363 1364 /** Returns Remote client's username. */ 1365 @RestrictedApi(explanation = "Should only be called in TestRSRpcServices and RSRpcServices", 1366 link = "", allowedOnPath = ".*(TestRSRpcServices|RSRpcServices).java") 1367 static String getUserName() { 1368 RpcCall rpcCall = RpcServer.getCurrentCall().orElse(null); 1369 if (rpcCall == null) { 1370 return HConstants.EMPTY_STRING; 1371 } 1372 return rpcCall.getRequestUserName().orElse(HConstants.EMPTY_STRING); 1373 } 1374 1375 private RegionScannerHolder addScanner(String scannerName, RegionScanner s, Shipper shipper, 1376 HRegion r, boolean needCursor, boolean fullRegionScan) throws LeaseStillHeldException { 1377 Lease lease = server.getLeaseManager().createLease(scannerName, this.scannerLeaseTimeoutPeriod, 1378 new ScannerListener(scannerName)); 1379 RpcCallback shippedCallback = new RegionScannerShippedCallBack(scannerName, shipper, lease); 1380 RpcCallback closeCallback = 1381 s instanceof RpcCallback ? (RpcCallback) s : new RegionScannerCloseCallBack(s); 1382 RegionScannerHolder rsh = new RegionScannerHolder(s, r, closeCallback, shippedCallback, 1383 needCursor, fullRegionScan, getRemoteClientIpAndPort(), getUserName()); 1384 RegionScannerHolder existing = scanners.putIfAbsent(scannerName, rsh); 1385 assert existing == null : "scannerId must be unique within regionserver's whole lifecycle! " 1386 + scannerName + ", " + existing; 1387 return rsh; 1388 } 1389 1390 private boolean isFullRegionScan(Scan scan, HRegion region) { 1391 // If the scan start row equals or less than the start key of the region 1392 // and stop row greater than equals end key (if stop row present) 1393 // or if the stop row is empty 1394 // account this as a full region scan 1395 if ( 1396 Bytes.compareTo(scan.getStartRow(), region.getRegionInfo().getStartKey()) <= 0 1397 && (Bytes.compareTo(scan.getStopRow(), region.getRegionInfo().getEndKey()) >= 0 1398 && !Bytes.equals(region.getRegionInfo().getEndKey(), HConstants.EMPTY_END_ROW) 1399 || Bytes.equals(scan.getStopRow(), HConstants.EMPTY_END_ROW)) 1400 ) { 1401 return true; 1402 } 1403 return false; 1404 } 1405 1406 /** 1407 * Find the HRegion based on a region specifier 1408 * @param regionSpecifier the region specifier 1409 * @return the corresponding region 1410 * @throws IOException if the specifier is not null, but failed to find the region 1411 */ 1412 public HRegion getRegion(final RegionSpecifier regionSpecifier) throws IOException { 1413 return server.getRegion(regionSpecifier.getValue().toByteArray()); 1414 } 1415 1416 /** 1417 * Find the List of HRegions based on a list of region specifiers 1418 * @param regionSpecifiers the list of region specifiers 1419 * @return the corresponding list of regions 1420 * @throws IOException if any of the specifiers is not null, but failed to find the region 1421 */ 1422 private List<HRegion> getRegions(final List<RegionSpecifier> regionSpecifiers, 1423 final CacheEvictionStatsBuilder stats) { 1424 List<HRegion> regions = Lists.newArrayListWithCapacity(regionSpecifiers.size()); 1425 for (RegionSpecifier regionSpecifier : regionSpecifiers) { 1426 try { 1427 regions.add(server.getRegion(regionSpecifier.getValue().toByteArray())); 1428 } catch (NotServingRegionException e) { 1429 stats.addException(regionSpecifier.getValue().toByteArray(), e); 1430 } 1431 } 1432 return regions; 1433 } 1434 1435 PriorityFunction getPriority() { 1436 return priority; 1437 } 1438 1439 private RegionServerRpcQuotaManager getRpcQuotaManager() { 1440 return server.getRegionServerRpcQuotaManager(); 1441 } 1442 1443 private RegionServerSpaceQuotaManager getSpaceQuotaManager() { 1444 return server.getRegionServerSpaceQuotaManager(); 1445 } 1446 1447 void start(ZKWatcher zkWatcher) { 1448 this.scannerIdGenerator = new ScannerIdGenerator(this.server.getServerName()); 1449 internalStart(zkWatcher); 1450 } 1451 1452 void stop() { 1453 closeAllScanners(); 1454 internalStop(); 1455 } 1456 1457 /** 1458 * Called to verify that this server is up and running. 1459 */ 1460 // TODO : Rename this and HMaster#checkInitialized to isRunning() (or a better name). 1461 protected void checkOpen() throws IOException { 1462 if (server.isAborted()) { 1463 throw new RegionServerAbortedException("Server " + server.getServerName() + " aborting"); 1464 } 1465 if (server.isStopped()) { 1466 throw new RegionServerStoppedException("Server " + server.getServerName() + " stopping"); 1467 } 1468 if (!server.isDataFileSystemOk()) { 1469 throw new RegionServerStoppedException("File system not available"); 1470 } 1471 if (!server.isOnline()) { 1472 throw new ServerNotRunningYetException( 1473 "Server " + server.getServerName() + " is not running yet"); 1474 } 1475 } 1476 1477 /** 1478 * By default, put up an Admin and a Client Service. Set booleans 1479 * <code>hbase.regionserver.admin.executorService</code> and 1480 * <code>hbase.regionserver.client.executorService</code> if you want to enable/disable services. 1481 * Default is that both are enabled. 1482 * @return immutable list of blocking services and the security info classes that this server 1483 * supports 1484 */ 1485 protected List<BlockingServiceAndInterface> getServices() { 1486 boolean admin = getConfiguration().getBoolean(REGIONSERVER_ADMIN_SERVICE_CONFIG, true); 1487 boolean client = getConfiguration().getBoolean(REGIONSERVER_CLIENT_SERVICE_CONFIG, true); 1488 boolean clientMeta = 1489 getConfiguration().getBoolean(REGIONSERVER_CLIENT_META_SERVICE_CONFIG, true); 1490 boolean bootstrapNodes = 1491 getConfiguration().getBoolean(REGIONSERVER_BOOTSTRAP_NODES_SERVICE_CONFIG, true); 1492 List<BlockingServiceAndInterface> bssi = new ArrayList<>(); 1493 if (client) { 1494 bssi.add(new BlockingServiceAndInterface(ClientService.newReflectiveBlockingService(this), 1495 ClientService.BlockingInterface.class)); 1496 } 1497 if (admin) { 1498 bssi.add(new BlockingServiceAndInterface(AdminService.newReflectiveBlockingService(this), 1499 AdminService.BlockingInterface.class)); 1500 } 1501 if (clientMeta) { 1502 bssi.add(new BlockingServiceAndInterface(ClientMetaService.newReflectiveBlockingService(this), 1503 ClientMetaService.BlockingInterface.class)); 1504 } 1505 if (bootstrapNodes) { 1506 bssi.add( 1507 new BlockingServiceAndInterface(BootstrapNodeService.newReflectiveBlockingService(this), 1508 BootstrapNodeService.BlockingInterface.class)); 1509 } 1510 return new ImmutableList.Builder<BlockingServiceAndInterface>().addAll(bssi).build(); 1511 } 1512 1513 /** 1514 * Close a region on the region server. 1515 * @param controller the RPC controller 1516 * @param request the request 1517 */ 1518 @Override 1519 @QosPriority(priority = HConstants.ADMIN_QOS) 1520 public CloseRegionResponse closeRegion(final RpcController controller, 1521 final CloseRegionRequest request) throws ServiceException { 1522 final ServerName sn = (request.hasDestinationServer() 1523 ? ProtobufUtil.toServerName(request.getDestinationServer()) 1524 : null); 1525 1526 try { 1527 checkOpen(); 1528 throwOnWrongStartCode(request); 1529 final String encodedRegionName = ProtobufUtil.getRegionEncodedName(request.getRegion()); 1530 1531 requestCount.increment(); 1532 if (sn == null) { 1533 LOG.info("Close " + encodedRegionName + " without moving"); 1534 } else { 1535 LOG.info("Close " + encodedRegionName + ", moving to " + sn); 1536 } 1537 boolean closed = server.closeRegion(encodedRegionName, false, sn); 1538 CloseRegionResponse.Builder builder = CloseRegionResponse.newBuilder().setClosed(closed); 1539 return builder.build(); 1540 } catch (IOException ie) { 1541 throw new ServiceException(ie); 1542 } 1543 } 1544 1545 /** 1546 * Compact a region on the region server. 1547 * @param controller the RPC controller 1548 * @param request the request 1549 */ 1550 @Override 1551 @QosPriority(priority = HConstants.ADMIN_QOS) 1552 public CompactRegionResponse compactRegion(final RpcController controller, 1553 final CompactRegionRequest request) throws ServiceException { 1554 try { 1555 checkOpen(); 1556 requestCount.increment(); 1557 HRegion region = getRegion(request.getRegion()); 1558 // Quota support is enabled, the requesting user is not system/super user 1559 // and a quota policy is enforced that disables compactions. 1560 if ( 1561 QuotaUtil.isQuotaEnabled(getConfiguration()) 1562 && !Superusers.isSuperUser(RpcServer.getRequestUser().orElse(null)) 1563 && this.server.getRegionServerSpaceQuotaManager() 1564 .areCompactionsDisabled(region.getTableDescriptor().getTableName()) 1565 ) { 1566 throw new DoNotRetryIOException( 1567 "Compactions on this region are " + "disabled due to a space quota violation."); 1568 } 1569 region.startRegionOperation(Operation.COMPACT_REGION); 1570 LOG.info("Compacting " + region.getRegionInfo().getRegionNameAsString()); 1571 boolean major = request.hasMajor() && request.getMajor(); 1572 if (request.hasFamily()) { 1573 byte[] family = request.getFamily().toByteArray(); 1574 String log = "User-triggered " + (major ? "major " : "") + "compaction for region " 1575 + region.getRegionInfo().getRegionNameAsString() + " and family " 1576 + Bytes.toString(family); 1577 LOG.trace(log); 1578 region.requestCompaction(family, log, Store.PRIORITY_USER, major, 1579 CompactionLifeCycleTracker.DUMMY); 1580 } else { 1581 String log = "User-triggered " + (major ? "major " : "") + "compaction for region " 1582 + region.getRegionInfo().getRegionNameAsString(); 1583 LOG.trace(log); 1584 region.requestCompaction(log, Store.PRIORITY_USER, major, CompactionLifeCycleTracker.DUMMY); 1585 } 1586 return CompactRegionResponse.newBuilder().build(); 1587 } catch (IOException ie) { 1588 throw new ServiceException(ie); 1589 } 1590 } 1591 1592 @Override 1593 @QosPriority(priority = HConstants.ADMIN_QOS) 1594 public CompactionSwitchResponse compactionSwitch(RpcController controller, 1595 CompactionSwitchRequest request) throws ServiceException { 1596 rpcPreCheck("compactionSwitch"); 1597 final CompactSplit compactSplitThread = server.getCompactSplitThread(); 1598 requestCount.increment(); 1599 boolean prevState = compactSplitThread.isCompactionsEnabled(); 1600 CompactionSwitchResponse response = 1601 CompactionSwitchResponse.newBuilder().setPrevState(prevState).build(); 1602 if (prevState == request.getEnabled()) { 1603 // passed in requested state is same as current state. No action required 1604 return response; 1605 } 1606 compactSplitThread.switchCompaction(request.getEnabled()); 1607 return response; 1608 } 1609 1610 /** 1611 * Flush a region on the region server. 1612 * @param controller the RPC controller 1613 * @param request the request 1614 */ 1615 @Override 1616 @QosPriority(priority = HConstants.ADMIN_QOS) 1617 public FlushRegionResponse flushRegion(final RpcController controller, 1618 final FlushRegionRequest request) throws ServiceException { 1619 try { 1620 checkOpen(); 1621 requestCount.increment(); 1622 HRegion region = getRegion(request.getRegion()); 1623 LOG.info("Flushing " + region.getRegionInfo().getRegionNameAsString()); 1624 boolean shouldFlush = true; 1625 if (request.hasIfOlderThanTs()) { 1626 shouldFlush = region.getEarliestFlushTimeForAllStores() < request.getIfOlderThanTs(); 1627 } 1628 FlushRegionResponse.Builder builder = FlushRegionResponse.newBuilder(); 1629 if (shouldFlush) { 1630 boolean writeFlushWalMarker = 1631 request.hasWriteFlushWalMarker() ? request.getWriteFlushWalMarker() : false; 1632 // Go behind the curtain so we can manage writing of the flush WAL marker 1633 HRegion.FlushResultImpl flushResult = null; 1634 if (request.hasFamily()) { 1635 List<byte[]> families = new ArrayList(); 1636 families.add(request.getFamily().toByteArray()); 1637 TableDescriptor tableDescriptor = region.getTableDescriptor(); 1638 List<String> noSuchFamilies = families.stream() 1639 .filter(f -> !tableDescriptor.hasColumnFamily(f)).map(Bytes::toString).toList(); 1640 if (!noSuchFamilies.isEmpty()) { 1641 throw new NoSuchColumnFamilyException("Column families " + noSuchFamilies 1642 + " don't exist in table " + tableDescriptor.getTableName().getNameAsString()); 1643 } 1644 flushResult = 1645 region.flushcache(families, writeFlushWalMarker, FlushLifeCycleTracker.DUMMY); 1646 } else { 1647 flushResult = region.flushcache(true, writeFlushWalMarker, FlushLifeCycleTracker.DUMMY); 1648 } 1649 boolean compactionNeeded = flushResult.isCompactionNeeded(); 1650 if (compactionNeeded) { 1651 server.getCompactSplitThread().requestSystemCompaction(region, 1652 "Compaction through user triggered flush"); 1653 } 1654 builder.setFlushed(flushResult.isFlushSucceeded()); 1655 builder.setWroteFlushWalMarker(flushResult.wroteFlushWalMarker); 1656 } 1657 builder.setLastFlushTime(region.getEarliestFlushTimeForAllStores()); 1658 return builder.build(); 1659 } catch (DroppedSnapshotException ex) { 1660 // Cache flush can fail in a few places. If it fails in a critical 1661 // section, we get a DroppedSnapshotException and a replay of wal 1662 // is required. Currently the only way to do this is a restart of 1663 // the server. 1664 server.abort("Replay of WAL required. Forcing server shutdown", ex); 1665 throw new ServiceException(ex); 1666 } catch (IOException ie) { 1667 throw new ServiceException(ie); 1668 } 1669 } 1670 1671 @Override 1672 @QosPriority(priority = HConstants.ADMIN_QOS) 1673 public GetOnlineRegionResponse getOnlineRegion(final RpcController controller, 1674 final GetOnlineRegionRequest request) throws ServiceException { 1675 try { 1676 checkOpen(); 1677 requestCount.increment(); 1678 Map<String, HRegion> onlineRegions = server.getOnlineRegions(); 1679 List<RegionInfo> list = new ArrayList<>(onlineRegions.size()); 1680 for (HRegion region : onlineRegions.values()) { 1681 list.add(region.getRegionInfo()); 1682 } 1683 list.sort(RegionInfo.COMPARATOR); 1684 return ResponseConverter.buildGetOnlineRegionResponse(list); 1685 } catch (IOException ie) { 1686 throw new ServiceException(ie); 1687 } 1688 } 1689 1690 // Master implementation of this Admin Service differs given it is not 1691 // able to supply detail only known to RegionServer. See note on 1692 // MasterRpcServers#getRegionInfo. 1693 @Override 1694 @QosPriority(priority = HConstants.ADMIN_QOS) 1695 public GetRegionInfoResponse getRegionInfo(final RpcController controller, 1696 final GetRegionInfoRequest request) throws ServiceException { 1697 try { 1698 checkOpen(); 1699 requestCount.increment(); 1700 HRegion region = getRegion(request.getRegion()); 1701 RegionInfo info = region.getRegionInfo(); 1702 byte[] bestSplitRow; 1703 if (request.hasBestSplitRow() && request.getBestSplitRow()) { 1704 bestSplitRow = region.checkSplit(true).orElse(null); 1705 // when all table data are in memstore, bestSplitRow = null 1706 // try to flush region first 1707 if (bestSplitRow == null) { 1708 region.flush(true); 1709 bestSplitRow = region.checkSplit(true).orElse(null); 1710 } 1711 } else { 1712 bestSplitRow = null; 1713 } 1714 GetRegionInfoResponse.Builder builder = GetRegionInfoResponse.newBuilder(); 1715 builder.setRegionInfo(ProtobufUtil.toRegionInfo(info)); 1716 if (request.hasCompactionState() && request.getCompactionState()) { 1717 builder.setCompactionState(ProtobufUtil.createCompactionState(region.getCompactionState())); 1718 } 1719 builder.setSplittable(region.isSplittable()); 1720 builder.setMergeable(region.isMergeable()); 1721 if (request.hasBestSplitRow() && request.getBestSplitRow() && bestSplitRow != null) { 1722 builder.setBestSplitRow(UnsafeByteOperations.unsafeWrap(bestSplitRow)); 1723 } 1724 return builder.build(); 1725 } catch (IOException ie) { 1726 throw new ServiceException(ie); 1727 } 1728 } 1729 1730 @Override 1731 @QosPriority(priority = HConstants.ADMIN_QOS) 1732 public GetRegionLoadResponse getRegionLoad(RpcController controller, GetRegionLoadRequest request) 1733 throws ServiceException { 1734 1735 List<HRegion> regions; 1736 if (request.hasTableName()) { 1737 TableName tableName = ProtobufUtil.toTableName(request.getTableName()); 1738 regions = server.getRegions(tableName); 1739 } else { 1740 regions = server.getRegions(); 1741 } 1742 List<RegionLoad> rLoads = new ArrayList<>(regions.size()); 1743 RegionLoad.Builder regionLoadBuilder = ClusterStatusProtos.RegionLoad.newBuilder(); 1744 RegionSpecifier.Builder regionSpecifier = RegionSpecifier.newBuilder(); 1745 1746 try { 1747 for (HRegion region : regions) { 1748 rLoads.add(server.createRegionLoad(region, regionLoadBuilder, regionSpecifier)); 1749 } 1750 } catch (IOException e) { 1751 throw new ServiceException(e); 1752 } 1753 GetRegionLoadResponse.Builder builder = GetRegionLoadResponse.newBuilder(); 1754 builder.addAllRegionLoads(rLoads); 1755 return builder.build(); 1756 } 1757 1758 @Override 1759 @QosPriority(priority = HConstants.ADMIN_QOS) 1760 public ClearCompactionQueuesResponse clearCompactionQueues(RpcController controller, 1761 ClearCompactionQueuesRequest request) throws ServiceException { 1762 LOG.debug("Client=" + RpcServer.getRequestUserName().orElse(null) + "/" 1763 + RpcServer.getRemoteAddress().orElse(null) + " clear compactions queue"); 1764 ClearCompactionQueuesResponse.Builder respBuilder = ClearCompactionQueuesResponse.newBuilder(); 1765 requestCount.increment(); 1766 if (clearCompactionQueues.compareAndSet(false, true)) { 1767 final CompactSplit compactSplitThread = server.getCompactSplitThread(); 1768 try { 1769 checkOpen(); 1770 server.getRegionServerCoprocessorHost().preClearCompactionQueues(); 1771 for (String queueName : request.getQueueNameList()) { 1772 LOG.debug("clear " + queueName + " compaction queue"); 1773 switch (queueName) { 1774 case "long": 1775 compactSplitThread.clearLongCompactionsQueue(); 1776 break; 1777 case "short": 1778 compactSplitThread.clearShortCompactionsQueue(); 1779 break; 1780 default: 1781 LOG.warn("Unknown queue name " + queueName); 1782 throw new IOException("Unknown queue name " + queueName); 1783 } 1784 } 1785 server.getRegionServerCoprocessorHost().postClearCompactionQueues(); 1786 } catch (IOException ie) { 1787 throw new ServiceException(ie); 1788 } finally { 1789 clearCompactionQueues.set(false); 1790 } 1791 } else { 1792 LOG.warn("Clear compactions queue is executing by other admin."); 1793 } 1794 return respBuilder.build(); 1795 } 1796 1797 /** 1798 * Get some information of the region server. 1799 * @param controller the RPC controller 1800 * @param request the request 1801 */ 1802 @Override 1803 @QosPriority(priority = HConstants.ADMIN_QOS) 1804 public GetServerInfoResponse getServerInfo(final RpcController controller, 1805 final GetServerInfoRequest request) throws ServiceException { 1806 try { 1807 checkOpen(); 1808 } catch (IOException ie) { 1809 throw new ServiceException(ie); 1810 } 1811 requestCount.increment(); 1812 int infoPort = server.getInfoServer() != null ? server.getInfoServer().getPort() : -1; 1813 return ResponseConverter.buildGetServerInfoResponse(server.getServerName(), infoPort); 1814 } 1815 1816 @Override 1817 @QosPriority(priority = HConstants.ADMIN_QOS) 1818 public GetStoreFileResponse getStoreFile(final RpcController controller, 1819 final GetStoreFileRequest request) throws ServiceException { 1820 try { 1821 checkOpen(); 1822 HRegion region = getRegion(request.getRegion()); 1823 requestCount.increment(); 1824 Set<byte[]> columnFamilies; 1825 if (request.getFamilyCount() == 0) { 1826 columnFamilies = region.getTableDescriptor().getColumnFamilyNames(); 1827 } else { 1828 columnFamilies = new TreeSet<>(Bytes.BYTES_RAWCOMPARATOR); 1829 for (ByteString cf : request.getFamilyList()) { 1830 columnFamilies.add(cf.toByteArray()); 1831 } 1832 } 1833 int nCF = columnFamilies.size(); 1834 List<String> fileList = region.getStoreFileList(columnFamilies.toArray(new byte[nCF][])); 1835 GetStoreFileResponse.Builder builder = GetStoreFileResponse.newBuilder(); 1836 builder.addAllStoreFile(fileList); 1837 return builder.build(); 1838 } catch (IOException ie) { 1839 throw new ServiceException(ie); 1840 } 1841 } 1842 1843 private void throwOnWrongStartCode(OpenRegionRequest request) throws ServiceException { 1844 if (!request.hasServerStartCode()) { 1845 LOG.warn("OpenRegionRequest for {} does not have a start code", request.getOpenInfoList()); 1846 return; 1847 } 1848 throwOnWrongStartCode(request.getServerStartCode()); 1849 } 1850 1851 private void throwOnWrongStartCode(CloseRegionRequest request) throws ServiceException { 1852 if (!request.hasServerStartCode()) { 1853 LOG.warn("CloseRegionRequest for {} does not have a start code", request.getRegion()); 1854 return; 1855 } 1856 throwOnWrongStartCode(request.getServerStartCode()); 1857 } 1858 1859 private void throwOnWrongStartCode(long serverStartCode) throws ServiceException { 1860 // check that we are the same server that this RPC is intended for. 1861 if (server.getServerName().getStartcode() != serverStartCode) { 1862 throw new ServiceException(new DoNotRetryIOException( 1863 "This RPC was intended for a " + "different server with startCode: " + serverStartCode 1864 + ", this server is: " + server.getServerName())); 1865 } 1866 } 1867 1868 private void throwOnWrongStartCode(ExecuteProceduresRequest req) throws ServiceException { 1869 if (req.getOpenRegionCount() > 0) { 1870 for (OpenRegionRequest openReq : req.getOpenRegionList()) { 1871 throwOnWrongStartCode(openReq); 1872 } 1873 } 1874 if (req.getCloseRegionCount() > 0) { 1875 for (CloseRegionRequest closeReq : req.getCloseRegionList()) { 1876 throwOnWrongStartCode(closeReq); 1877 } 1878 } 1879 } 1880 1881 /** 1882 * Open asynchronously a region or a set of regions on the region server. The opening is 1883 * coordinated by ZooKeeper, and this method requires the znode to be created before being called. 1884 * As a consequence, this method should be called only from the master. 1885 * <p> 1886 * Different manages states for the region are: 1887 * </p> 1888 * <ul> 1889 * <li>region not opened: the region opening will start asynchronously.</li> 1890 * <li>a close is already in progress: this is considered as an error.</li> 1891 * <li>an open is already in progress: this new open request will be ignored. This is important 1892 * because the Master can do multiple requests if it crashes.</li> 1893 * <li>the region is already opened: this new open request will be ignored.</li> 1894 * </ul> 1895 * <p> 1896 * Bulk assign: If there are more than 1 region to open, it will be considered as a bulk assign. 1897 * For a single region opening, errors are sent through a ServiceException. For bulk assign, 1898 * errors are put in the response as FAILED_OPENING. 1899 * </p> 1900 * @param controller the RPC controller 1901 * @param request the request 1902 */ 1903 @Override 1904 @QosPriority(priority = HConstants.ADMIN_QOS) 1905 public OpenRegionResponse openRegion(final RpcController controller, 1906 final OpenRegionRequest request) throws ServiceException { 1907 requestCount.increment(); 1908 throwOnWrongStartCode(request); 1909 1910 OpenRegionResponse.Builder builder = OpenRegionResponse.newBuilder(); 1911 final int regionCount = request.getOpenInfoCount(); 1912 final Map<TableName, TableDescriptor> htds = new HashMap<>(regionCount); 1913 final boolean isBulkAssign = regionCount > 1; 1914 try { 1915 checkOpen(); 1916 } catch (IOException ie) { 1917 TableName tableName = null; 1918 if (regionCount == 1) { 1919 org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionInfo ri = 1920 request.getOpenInfo(0).getRegion(); 1921 if (ri != null) { 1922 tableName = ProtobufUtil.toTableName(ri.getTableName()); 1923 } 1924 } 1925 if (!TableName.META_TABLE_NAME.equals(tableName)) { 1926 throw new ServiceException(ie); 1927 } 1928 // We are assigning meta, wait a little for regionserver to finish initialization. 1929 // Default to quarter of RPC timeout 1930 int timeout = server.getConfiguration().getInt(HConstants.HBASE_RPC_TIMEOUT_KEY, 1931 HConstants.DEFAULT_HBASE_RPC_TIMEOUT) >> 2; 1932 long endTime = EnvironmentEdgeManager.currentTime() + timeout; 1933 synchronized (server.online) { 1934 try { 1935 while ( 1936 EnvironmentEdgeManager.currentTime() <= endTime && !server.isStopped() 1937 && !server.isOnline() 1938 ) { 1939 server.online.wait(server.getMsgInterval()); 1940 } 1941 checkOpen(); 1942 } catch (InterruptedException t) { 1943 Thread.currentThread().interrupt(); 1944 throw new ServiceException(t); 1945 } catch (IOException e) { 1946 throw new ServiceException(e); 1947 } 1948 } 1949 } 1950 1951 long masterSystemTime = request.hasMasterSystemTime() ? request.getMasterSystemTime() : -1; 1952 1953 for (RegionOpenInfo regionOpenInfo : request.getOpenInfoList()) { 1954 final RegionInfo region = ProtobufUtil.toRegionInfo(regionOpenInfo.getRegion()); 1955 TableDescriptor htd; 1956 try { 1957 String encodedName = region.getEncodedName(); 1958 byte[] encodedNameBytes = region.getEncodedNameAsBytes(); 1959 final HRegion onlineRegion = server.getRegion(encodedName); 1960 if (onlineRegion != null) { 1961 // The region is already online. This should not happen any more. 1962 String error = "Received OPEN for the region:" + region.getRegionNameAsString() 1963 + ", which is already online"; 1964 LOG.warn(error); 1965 // server.abort(error); 1966 // throw new IOException(error); 1967 builder.addOpeningState(RegionOpeningState.OPENED); 1968 continue; 1969 } 1970 LOG.info("Open " + region.getRegionNameAsString()); 1971 1972 final Boolean previous = 1973 server.getRegionsInTransitionInRS().putIfAbsent(encodedNameBytes, Boolean.TRUE); 1974 1975 if (Boolean.FALSE.equals(previous)) { 1976 if (server.getRegion(encodedName) != null) { 1977 // There is a close in progress. This should not happen any more. 1978 String error = "Received OPEN for the region:" + region.getRegionNameAsString() 1979 + ", which we are already trying to CLOSE"; 1980 server.abort(error); 1981 throw new IOException(error); 1982 } 1983 server.getRegionsInTransitionInRS().put(encodedNameBytes, Boolean.TRUE); 1984 } 1985 1986 if (Boolean.TRUE.equals(previous)) { 1987 // An open is in progress. This is supported, but let's log this. 1988 LOG.info("Receiving OPEN for the region:" + region.getRegionNameAsString() 1989 + ", which we are already trying to OPEN" 1990 + " - ignoring this new request for this region."); 1991 } 1992 1993 // We are opening this region. If it moves back and forth for whatever reason, we don't 1994 // want to keep returning the stale moved record while we are opening/if we close again. 1995 server.removeFromMovedRegions(region.getEncodedName()); 1996 1997 if (previous == null || !previous.booleanValue()) { 1998 htd = htds.get(region.getTable()); 1999 if (htd == null) { 2000 htd = server.getTableDescriptors().get(region.getTable()); 2001 htds.put(region.getTable(), htd); 2002 } 2003 if (htd == null) { 2004 throw new IOException("Missing table descriptor for " + region.getEncodedName()); 2005 } 2006 // If there is no action in progress, we can submit a specific handler. 2007 // Need to pass the expected version in the constructor. 2008 if (server.getExecutorService() == null) { 2009 LOG.info("No executor executorService; skipping open request"); 2010 } else { 2011 if (region.isMetaRegion()) { 2012 server.getExecutorService() 2013 .submit(new OpenMetaHandler(server, server, region, htd, masterSystemTime)); 2014 } else { 2015 if (regionOpenInfo.getFavoredNodesCount() > 0) { 2016 server.updateRegionFavoredNodesMapping(region.getEncodedName(), 2017 regionOpenInfo.getFavoredNodesList()); 2018 } 2019 if (htd.getPriority() >= HConstants.ADMIN_QOS || region.getTable().isSystemTable()) { 2020 server.getExecutorService().submit( 2021 new OpenPriorityRegionHandler(server, server, region, htd, masterSystemTime)); 2022 } else { 2023 server.getExecutorService() 2024 .submit(new OpenRegionHandler(server, server, region, htd, masterSystemTime)); 2025 } 2026 } 2027 } 2028 } 2029 2030 builder.addOpeningState(RegionOpeningState.OPENED); 2031 } catch (IOException ie) { 2032 LOG.warn("Failed opening region " + region.getRegionNameAsString(), ie); 2033 if (isBulkAssign) { 2034 builder.addOpeningState(RegionOpeningState.FAILED_OPENING); 2035 } else { 2036 throw new ServiceException(ie); 2037 } 2038 } 2039 } 2040 return builder.build(); 2041 } 2042 2043 /** 2044 * Warmup a region on this server. This method should only be called by Master. It synchronously 2045 * opens the region and closes the region bringing the most important pages in cache. 2046 */ 2047 @Override 2048 public WarmupRegionResponse warmupRegion(final RpcController controller, 2049 final WarmupRegionRequest request) throws ServiceException { 2050 final RegionInfo region = ProtobufUtil.toRegionInfo(request.getRegionInfo()); 2051 WarmupRegionResponse response = WarmupRegionResponse.getDefaultInstance(); 2052 try { 2053 checkOpen(); 2054 String encodedName = region.getEncodedName(); 2055 byte[] encodedNameBytes = region.getEncodedNameAsBytes(); 2056 final HRegion onlineRegion = server.getRegion(encodedName); 2057 if (onlineRegion != null) { 2058 LOG.info("{} is online; skipping warmup", region); 2059 return response; 2060 } 2061 TableDescriptor htd = server.getTableDescriptors().get(region.getTable()); 2062 if (server.getRegionsInTransitionInRS().containsKey(encodedNameBytes)) { 2063 LOG.info("{} is in transition; skipping warmup", region); 2064 return response; 2065 } 2066 LOG.info("Warmup {}", region.getRegionNameAsString()); 2067 HRegion.warmupHRegion(region, htd, server.getWAL(region), server.getConfiguration(), server, 2068 null); 2069 } catch (IOException ie) { 2070 LOG.error("Failed warmup of {}", region.getRegionNameAsString(), ie); 2071 throw new ServiceException(ie); 2072 } 2073 2074 return response; 2075 } 2076 2077 private ExtendedCellScanner getAndReset(RpcController controller) { 2078 HBaseRpcController hrc = (HBaseRpcController) controller; 2079 ExtendedCellScanner cells = hrc.cellScanner(); 2080 hrc.setCellScanner(null); 2081 return cells; 2082 } 2083 2084 /** 2085 * Replay the given changes when distributedLogReplay WAL edits from a failed RS. The guarantee is 2086 * that the given mutations will be durable on the receiving RS if this method returns without any 2087 * exception. 2088 * @param controller the RPC controller 2089 * @param request the request 2090 * @deprecated Since 3.0.0, will be removed in 4.0.0. Not used any more, put here only for 2091 * compatibility with old region replica implementation. Now we will use 2092 * {@code replicateToReplica} method instead. 2093 */ 2094 @Deprecated 2095 @Override 2096 @QosPriority(priority = HConstants.REPLAY_QOS) 2097 public ReplicateWALEntryResponse replay(final RpcController controller, 2098 final ReplicateWALEntryRequest request) throws ServiceException { 2099 long before = EnvironmentEdgeManager.currentTime(); 2100 ExtendedCellScanner cells = getAndReset(controller); 2101 try { 2102 checkOpen(); 2103 List<WALEntry> entries = request.getEntryList(); 2104 if (entries == null || entries.isEmpty()) { 2105 // empty input 2106 return ReplicateWALEntryResponse.newBuilder().build(); 2107 } 2108 ByteString regionName = entries.get(0).getKey().getEncodedRegionName(); 2109 HRegion region = server.getRegionByEncodedName(regionName.toStringUtf8()); 2110 RegionCoprocessorHost coprocessorHost = 2111 ServerRegionReplicaUtil.isDefaultReplica(region.getRegionInfo()) 2112 ? region.getCoprocessorHost() 2113 : null; // do not invoke coprocessors if this is a secondary region replica 2114 2115 // Skip adding the edits to WAL if this is a secondary region replica 2116 boolean isPrimary = RegionReplicaUtil.isDefaultReplica(region.getRegionInfo()); 2117 Durability durability = isPrimary ? Durability.USE_DEFAULT : Durability.SKIP_WAL; 2118 2119 for (WALEntry entry : entries) { 2120 if (!regionName.equals(entry.getKey().getEncodedRegionName())) { 2121 throw new NotServingRegionException("Replay request contains entries from multiple " 2122 + "regions. First region:" + regionName.toStringUtf8() + " , other region:" 2123 + entry.getKey().getEncodedRegionName()); 2124 } 2125 if (server.nonceManager != null && isPrimary) { 2126 long nonceGroup = 2127 entry.getKey().hasNonceGroup() ? entry.getKey().getNonceGroup() : HConstants.NO_NONCE; 2128 long nonce = entry.getKey().hasNonce() ? entry.getKey().getNonce() : HConstants.NO_NONCE; 2129 server.nonceManager.reportOperationFromWal(nonceGroup, nonce, 2130 entry.getKey().getWriteTime()); 2131 } 2132 Pair<WALKey, WALEdit> walEntry = (coprocessorHost == null) ? null : new Pair<>(); 2133 List<MutationReplay> edits = 2134 WALSplitUtil.getMutationsFromWALEntry(entry, cells, walEntry, durability); 2135 if (edits != null && !edits.isEmpty()) { 2136 // HBASE-17924 2137 // sort to improve lock efficiency 2138 Collections.sort(edits, (v1, v2) -> Row.COMPARATOR.compare(v1.mutation, v2.mutation)); 2139 long replaySeqId = (entry.getKey().hasOrigSequenceNumber()) 2140 ? entry.getKey().getOrigSequenceNumber() 2141 : entry.getKey().getLogSequenceNumber(); 2142 OperationStatus[] result = doReplayBatchOp(region, edits, replaySeqId); 2143 // check if it's a partial success 2144 for (int i = 0; result != null && i < result.length; i++) { 2145 if (result[i] != OperationStatus.SUCCESS) { 2146 throw new IOException(result[i].getExceptionMsg()); 2147 } 2148 } 2149 } 2150 } 2151 2152 // sync wal at the end because ASYNC_WAL is used above 2153 WAL wal = region.getWAL(); 2154 if (wal != null) { 2155 wal.sync(); 2156 } 2157 return ReplicateWALEntryResponse.newBuilder().build(); 2158 } catch (IOException ie) { 2159 throw new ServiceException(ie); 2160 } finally { 2161 final MetricsRegionServer metricsRegionServer = server.getMetrics(); 2162 if (metricsRegionServer != null) { 2163 metricsRegionServer.updateReplay(EnvironmentEdgeManager.currentTime() - before); 2164 } 2165 } 2166 } 2167 2168 /** 2169 * Replay the given changes on a secondary replica 2170 */ 2171 @Override 2172 public ReplicateWALEntryResponse replicateToReplica(RpcController controller, 2173 ReplicateWALEntryRequest request) throws ServiceException { 2174 CellScanner cells = getAndReset(controller); 2175 try { 2176 checkOpen(); 2177 List<WALEntry> entries = request.getEntryList(); 2178 if (entries == null || entries.isEmpty()) { 2179 // empty input 2180 return ReplicateWALEntryResponse.newBuilder().build(); 2181 } 2182 ByteString regionName = entries.get(0).getKey().getEncodedRegionName(); 2183 HRegion region = server.getRegionByEncodedName(regionName.toStringUtf8()); 2184 if (RegionReplicaUtil.isDefaultReplica(region.getRegionInfo())) { 2185 throw new DoNotRetryIOException( 2186 "Should not replicate to primary replica " + region.getRegionInfo() + ", CODE BUG?"); 2187 } 2188 for (WALEntry entry : entries) { 2189 if (!regionName.equals(entry.getKey().getEncodedRegionName())) { 2190 throw new NotServingRegionException( 2191 "ReplicateToReplica request contains entries from multiple " + "regions. First region:" 2192 + regionName.toStringUtf8() + " , other region:" 2193 + entry.getKey().getEncodedRegionName()); 2194 } 2195 region.replayWALEntry(entry, cells); 2196 } 2197 return ReplicateWALEntryResponse.newBuilder().build(); 2198 } catch (IOException ie) { 2199 throw new ServiceException(ie); 2200 } 2201 } 2202 2203 private void checkShouldRejectReplicationRequest(List<WALEntry> entries) throws IOException { 2204 ReplicationSourceService replicationSource = server.getReplicationSourceService(); 2205 if (replicationSource == null || entries.isEmpty()) { 2206 return; 2207 } 2208 // We can ensure that all entries are for one peer, so only need to check one entry's 2209 // table name. if the table hit sync replication at peer side and the peer cluster 2210 // is (or is transiting to) state ACTIVE or DOWNGRADE_ACTIVE, we should reject to apply 2211 // those entries according to the design doc. 2212 TableName table = TableName.valueOf(entries.get(0).getKey().getTableName().toByteArray()); 2213 if ( 2214 replicationSource.getSyncReplicationPeerInfoProvider().checkState(table, 2215 RejectReplicationRequestStateChecker.get()) 2216 ) { 2217 throw new DoNotRetryIOException( 2218 "Reject to apply to sink cluster because sync replication state of sink cluster " 2219 + "is ACTIVE or DOWNGRADE_ACTIVE, table: " + table); 2220 } 2221 } 2222 2223 /** 2224 * Replicate WAL entries on the region server. 2225 * @param controller the RPC controller 2226 * @param request the request 2227 */ 2228 @Override 2229 @QosPriority(priority = HConstants.REPLICATION_QOS) 2230 public ReplicateWALEntryResponse replicateWALEntry(final RpcController controller, 2231 final ReplicateWALEntryRequest request) throws ServiceException { 2232 try { 2233 checkOpen(); 2234 if (server.getReplicationSinkService() != null) { 2235 requestCount.increment(); 2236 List<WALEntry> entries = request.getEntryList(); 2237 checkShouldRejectReplicationRequest(entries); 2238 ExtendedCellScanner cellScanner = getAndReset(controller); 2239 server.getRegionServerCoprocessorHost().preReplicateLogEntries(); 2240 server.getReplicationSinkService().replicateLogEntries(entries, cellScanner, 2241 request.getReplicationClusterId(), request.getSourceBaseNamespaceDirPath(), 2242 request.getSourceHFileArchiveDirPath()); 2243 server.getRegionServerCoprocessorHost().postReplicateLogEntries(); 2244 return ReplicateWALEntryResponse.newBuilder().build(); 2245 } else { 2246 throw new ServiceException("Replication services are not initialized yet"); 2247 } 2248 } catch (IOException ie) { 2249 throw new ServiceException(ie); 2250 } 2251 } 2252 2253 /** 2254 * Roll the WAL writer of the region server. 2255 * @param controller the RPC controller 2256 * @param request the request 2257 */ 2258 @Override 2259 @QosPriority(priority = HConstants.ADMIN_QOS) 2260 public RollWALWriterResponse rollWALWriter(final RpcController controller, 2261 final RollWALWriterRequest request) throws ServiceException { 2262 try { 2263 checkOpen(); 2264 requestCount.increment(); 2265 server.getRegionServerCoprocessorHost().preRollWALWriterRequest(); 2266 server.getWalRoller().requestRollAll(); 2267 server.getRegionServerCoprocessorHost().postRollWALWriterRequest(); 2268 RollWALWriterResponse.Builder builder = RollWALWriterResponse.newBuilder(); 2269 return builder.build(); 2270 } catch (IOException ie) { 2271 throw new ServiceException(ie); 2272 } 2273 } 2274 2275 /** 2276 * Stop the region server. 2277 * @param controller the RPC controller 2278 * @param request the request 2279 */ 2280 @Override 2281 @QosPriority(priority = HConstants.ADMIN_QOS) 2282 public StopServerResponse stopServer(final RpcController controller, 2283 final StopServerRequest request) throws ServiceException { 2284 rpcPreCheck("stopServer"); 2285 requestCount.increment(); 2286 String reason = request.getReason(); 2287 server.stop(reason); 2288 return StopServerResponse.newBuilder().build(); 2289 } 2290 2291 @Override 2292 public UpdateFavoredNodesResponse updateFavoredNodes(RpcController controller, 2293 UpdateFavoredNodesRequest request) throws ServiceException { 2294 rpcPreCheck("updateFavoredNodes"); 2295 List<UpdateFavoredNodesRequest.RegionUpdateInfo> openInfoList = request.getUpdateInfoList(); 2296 UpdateFavoredNodesResponse.Builder respBuilder = UpdateFavoredNodesResponse.newBuilder(); 2297 for (UpdateFavoredNodesRequest.RegionUpdateInfo regionUpdateInfo : openInfoList) { 2298 RegionInfo hri = ProtobufUtil.toRegionInfo(regionUpdateInfo.getRegion()); 2299 if (regionUpdateInfo.getFavoredNodesCount() > 0) { 2300 server.updateRegionFavoredNodesMapping(hri.getEncodedName(), 2301 regionUpdateInfo.getFavoredNodesList()); 2302 } 2303 } 2304 respBuilder.setResponse(openInfoList.size()); 2305 return respBuilder.build(); 2306 } 2307 2308 /** 2309 * Atomically bulk load several HFiles into an open region 2310 * @return true if successful, false is failed but recoverably (no action) 2311 * @throws ServiceException if failed unrecoverably 2312 */ 2313 @Override 2314 public BulkLoadHFileResponse bulkLoadHFile(final RpcController controller, 2315 final BulkLoadHFileRequest request) throws ServiceException { 2316 long start = EnvironmentEdgeManager.currentTime(); 2317 List<String> clusterIds = new ArrayList<>(request.getClusterIdsList()); 2318 if (clusterIds.contains(this.server.getClusterId())) { 2319 return BulkLoadHFileResponse.newBuilder().setLoaded(true).build(); 2320 } else { 2321 clusterIds.add(this.server.getClusterId()); 2322 } 2323 try { 2324 checkOpen(); 2325 requestCount.increment(); 2326 HRegion region = getRegion(request.getRegion()); 2327 final boolean spaceQuotaEnabled = QuotaUtil.isQuotaEnabled(getConfiguration()); 2328 long sizeToBeLoaded = -1; 2329 2330 // Check to see if this bulk load would exceed the space quota for this table 2331 if (spaceQuotaEnabled) { 2332 ActivePolicyEnforcement activeSpaceQuotas = getSpaceQuotaManager().getActiveEnforcements(); 2333 SpaceViolationPolicyEnforcement enforcement = 2334 activeSpaceQuotas.getPolicyEnforcement(region); 2335 if (enforcement != null) { 2336 // Bulk loads must still be atomic. We must enact all or none. 2337 List<String> filePaths = new ArrayList<>(request.getFamilyPathCount()); 2338 for (FamilyPath familyPath : request.getFamilyPathList()) { 2339 filePaths.add(familyPath.getPath()); 2340 } 2341 // Check if the batch of files exceeds the current quota 2342 sizeToBeLoaded = enforcement.computeBulkLoadSize(getFileSystem(filePaths), filePaths); 2343 } 2344 } 2345 // secure bulk load 2346 Map<byte[], List<Path>> map = 2347 server.getSecureBulkLoadManager().secureBulkLoadHFiles(region, request, clusterIds); 2348 BulkLoadHFileResponse.Builder builder = BulkLoadHFileResponse.newBuilder(); 2349 builder.setLoaded(map != null); 2350 if (map != null) { 2351 // Treat any negative size as a flag to "ignore" updating the region size as that is 2352 // not possible to occur in real life (cannot bulk load a file with negative size) 2353 if (spaceQuotaEnabled && sizeToBeLoaded > 0) { 2354 if (LOG.isTraceEnabled()) { 2355 LOG.trace("Incrementing space use of " + region.getRegionInfo() + " by " 2356 + sizeToBeLoaded + " bytes"); 2357 } 2358 // Inform space quotas of the new files for this region 2359 getSpaceQuotaManager().getRegionSizeStore().incrementRegionSize(region.getRegionInfo(), 2360 sizeToBeLoaded); 2361 } 2362 } 2363 return builder.build(); 2364 } catch (IOException ie) { 2365 throw new ServiceException(ie); 2366 } finally { 2367 final MetricsRegionServer metricsRegionServer = server.getMetrics(); 2368 if (metricsRegionServer != null) { 2369 metricsRegionServer.updateBulkLoad(EnvironmentEdgeManager.currentTime() - start); 2370 } 2371 } 2372 } 2373 2374 @Override 2375 public PrepareBulkLoadResponse prepareBulkLoad(RpcController controller, 2376 PrepareBulkLoadRequest request) throws ServiceException { 2377 try { 2378 checkOpen(); 2379 requestCount.increment(); 2380 2381 HRegion region = getRegion(request.getRegion()); 2382 2383 String bulkToken = server.getSecureBulkLoadManager().prepareBulkLoad(region, request); 2384 PrepareBulkLoadResponse.Builder builder = PrepareBulkLoadResponse.newBuilder(); 2385 builder.setBulkToken(bulkToken); 2386 return builder.build(); 2387 } catch (IOException ie) { 2388 throw new ServiceException(ie); 2389 } 2390 } 2391 2392 @Override 2393 public CleanupBulkLoadResponse cleanupBulkLoad(RpcController controller, 2394 CleanupBulkLoadRequest request) throws ServiceException { 2395 try { 2396 checkOpen(); 2397 requestCount.increment(); 2398 2399 HRegion region = getRegion(request.getRegion()); 2400 2401 server.getSecureBulkLoadManager().cleanupBulkLoad(region, request); 2402 return CleanupBulkLoadResponse.newBuilder().build(); 2403 } catch (IOException ie) { 2404 throw new ServiceException(ie); 2405 } 2406 } 2407 2408 @Override 2409 public CoprocessorServiceResponse execService(final RpcController controller, 2410 final CoprocessorServiceRequest request) throws ServiceException { 2411 try { 2412 checkOpen(); 2413 requestCount.increment(); 2414 HRegion region = getRegion(request.getRegion()); 2415 Message result = execServiceOnRegion(region, request.getCall()); 2416 CoprocessorServiceResponse.Builder builder = CoprocessorServiceResponse.newBuilder(); 2417 builder.setRegion(RequestConverter.buildRegionSpecifier(RegionSpecifierType.REGION_NAME, 2418 region.getRegionInfo().getRegionName())); 2419 // TODO: COPIES!!!!!! 2420 builder.setValue(builder.getValueBuilder().setName(result.getClass().getName()).setValue( 2421 org.apache.hbase.thirdparty.com.google.protobuf.ByteString.copyFrom(result.toByteArray()))); 2422 return builder.build(); 2423 } catch (IOException ie) { 2424 throw new ServiceException(ie); 2425 } 2426 } 2427 2428 private FileSystem getFileSystem(List<String> filePaths) throws IOException { 2429 if (filePaths.isEmpty()) { 2430 // local hdfs 2431 return server.getFileSystem(); 2432 } 2433 // source hdfs 2434 return new Path(filePaths.get(0)).getFileSystem(server.getConfiguration()); 2435 } 2436 2437 private Message execServiceOnRegion(HRegion region, 2438 final ClientProtos.CoprocessorServiceCall serviceCall) throws IOException { 2439 // ignore the passed in controller (from the serialized call) 2440 ServerRpcController execController = new ServerRpcController(); 2441 return region.execService(execController, serviceCall); 2442 } 2443 2444 private boolean shouldRejectRequestsFromClient(HRegion region) { 2445 TableName table = region.getRegionInfo().getTable(); 2446 ReplicationSourceService service = server.getReplicationSourceService(); 2447 return service != null && service.getSyncReplicationPeerInfoProvider().checkState(table, 2448 RejectRequestsFromClientStateChecker.get()); 2449 } 2450 2451 private void rejectIfInStandByState(HRegion region) throws DoNotRetryIOException { 2452 if (shouldRejectRequestsFromClient(region)) { 2453 throw new DoNotRetryIOException( 2454 region.getRegionInfo().getRegionNameAsString() + " is in STANDBY state."); 2455 } 2456 } 2457 2458 /** 2459 * Get data from a table. 2460 * @param controller the RPC controller 2461 * @param request the get request 2462 */ 2463 @Override 2464 public GetResponse get(final RpcController controller, final GetRequest request) 2465 throws ServiceException { 2466 long before = EnvironmentEdgeManager.currentTime(); 2467 OperationQuota quota = null; 2468 HRegion region = null; 2469 RpcCallContext context = RpcServer.getCurrentCall().orElse(null); 2470 try { 2471 checkOpen(); 2472 requestCount.increment(); 2473 rpcGetRequestCount.increment(); 2474 region = getRegion(request.getRegion()); 2475 rejectIfInStandByState(region); 2476 2477 GetResponse.Builder builder = GetResponse.newBuilder(); 2478 ClientProtos.Get get = request.getGet(); 2479 // An asynchbase client, https://github.com/OpenTSDB/asynchbase, starts by trying to do 2480 // a get closest before. Throwing the UnknownProtocolException signals it that it needs 2481 // to switch and do hbase2 protocol (HBase servers do not tell clients what versions 2482 // they are; its a problem for non-native clients like asynchbase. HBASE-20225. 2483 if (get.hasClosestRowBefore() && get.getClosestRowBefore()) { 2484 throw new UnknownProtocolException("Is this a pre-hbase-1.0.0 or asynchbase client? " 2485 + "Client is invoking getClosestRowBefore removed in hbase-2.0.0 replaced by " 2486 + "reverse Scan."); 2487 } 2488 Boolean existence = null; 2489 Result r = null; 2490 quota = getRpcQuotaManager().checkBatchQuota(region, OperationQuota.OperationType.GET); 2491 2492 Get clientGet = ProtobufUtil.toGet(get); 2493 if (get.getExistenceOnly() && region.getCoprocessorHost() != null) { 2494 existence = region.getCoprocessorHost().preExists(clientGet); 2495 } 2496 if (existence == null) { 2497 if (context != null) { 2498 r = get(clientGet, (region), null, context); 2499 } else { 2500 // for test purpose 2501 r = region.get(clientGet); 2502 } 2503 if (get.getExistenceOnly()) { 2504 boolean exists = r.getExists(); 2505 if (region.getCoprocessorHost() != null) { 2506 exists = region.getCoprocessorHost().postExists(clientGet, exists); 2507 } 2508 existence = exists; 2509 } 2510 } 2511 if (existence != null) { 2512 ClientProtos.Result pbr = 2513 ProtobufUtil.toResult(existence, region.getRegionInfo().getReplicaId() != 0); 2514 builder.setResult(pbr); 2515 } else if (r != null) { 2516 ClientProtos.Result pbr; 2517 if ( 2518 isClientCellBlockSupport(context) && controller instanceof HBaseRpcController 2519 && VersionInfoUtil.hasMinimumVersion(context.getClientVersionInfo(), 1, 3) 2520 ) { 2521 pbr = ProtobufUtil.toResultNoData(r); 2522 ((HBaseRpcController) controller).setCellScanner( 2523 PrivateCellUtil.createExtendedCellScanner(ClientInternalHelper.getExtendedRawCells(r))); 2524 addSize(context, r); 2525 } else { 2526 pbr = ProtobufUtil.toResult(r); 2527 } 2528 builder.setResult(pbr); 2529 } 2530 // r.cells is null when an table.exists(get) call 2531 if (r != null && r.rawCells() != null) { 2532 quota.addGetResult(r); 2533 } 2534 return builder.build(); 2535 } catch (IOException ie) { 2536 throw new ServiceException(ie); 2537 } finally { 2538 final MetricsRegionServer metricsRegionServer = server.getMetrics(); 2539 if (metricsRegionServer != null && region != null) { 2540 long blockBytesScanned = context != null ? context.getBlockBytesScanned() : 0; 2541 metricsRegionServer.updateGet(region, EnvironmentEdgeManager.currentTime() - before, 2542 blockBytesScanned); 2543 } 2544 if (quota != null) { 2545 quota.close(); 2546 } 2547 } 2548 } 2549 2550 private Result get(Get get, HRegion region, RegionScannersCloseCallBack closeCallBack, 2551 RpcCallContext context) throws IOException { 2552 region.prepareGet(get); 2553 boolean stale = region.getRegionInfo().getReplicaId() != 0; 2554 2555 // This method is almost the same as HRegion#get. 2556 List<Cell> results = new ArrayList<>(); 2557 // pre-get CP hook 2558 if (region.getCoprocessorHost() != null) { 2559 if (region.getCoprocessorHost().preGet(get, results)) { 2560 region.metricsUpdateForGet(); 2561 return Result.create(results, get.isCheckExistenceOnly() ? !results.isEmpty() : null, 2562 stale); 2563 } 2564 } 2565 Scan scan = new Scan(get); 2566 if (scan.getLoadColumnFamiliesOnDemandValue() == null) { 2567 scan.setLoadColumnFamiliesOnDemand(region.isLoadingCfsOnDemandDefault()); 2568 } 2569 RegionScannerImpl scanner = null; 2570 try { 2571 scanner = region.getScanner(scan); 2572 scanner.next(results); 2573 } finally { 2574 if (scanner != null) { 2575 if (closeCallBack == null) { 2576 // If there is a context then the scanner can be added to the current 2577 // RpcCallContext. The rpc callback will take care of closing the 2578 // scanner, for eg in case 2579 // of get() 2580 context.setCallBack(scanner); 2581 } else { 2582 // The call is from multi() where the results from the get() are 2583 // aggregated and then send out to the 2584 // rpc. The rpccall back will close all such scanners created as part 2585 // of multi(). 2586 closeCallBack.addScanner(scanner); 2587 } 2588 } 2589 } 2590 2591 // post-get CP hook 2592 if (region.getCoprocessorHost() != null) { 2593 region.getCoprocessorHost().postGet(get, results); 2594 } 2595 region.metricsUpdateForGet(); 2596 2597 return Result.create(results, get.isCheckExistenceOnly() ? !results.isEmpty() : null, stale); 2598 } 2599 2600 private void checkBatchSizeAndLogLargeSize(MultiRequest request) throws ServiceException { 2601 int sum = 0; 2602 String firstRegionName = null; 2603 for (RegionAction regionAction : request.getRegionActionList()) { 2604 if (sum == 0) { 2605 firstRegionName = Bytes.toStringBinary(regionAction.getRegion().getValue().toByteArray()); 2606 } 2607 sum += regionAction.getActionCount(); 2608 } 2609 if (sum > rowSizeWarnThreshold) { 2610 LOG.warn("Large batch operation detected (greater than " + rowSizeWarnThreshold 2611 + ") (HBASE-18023)." + " Requested Number of Rows: " + sum + " Client: " 2612 + RpcServer.getRequestUserName().orElse(null) + "/" 2613 + RpcServer.getRemoteAddress().orElse(null) + " first region in multi=" + firstRegionName); 2614 if (rejectRowsWithSizeOverThreshold) { 2615 throw new ServiceException( 2616 "Rejecting large batch operation for current batch with firstRegionName: " 2617 + firstRegionName + " , Requested Number of Rows: " + sum + " , Size Threshold: " 2618 + rowSizeWarnThreshold); 2619 } 2620 } 2621 } 2622 2623 private void failRegionAction(MultiResponse.Builder responseBuilder, 2624 RegionActionResult.Builder regionActionResultBuilder, RegionAction regionAction, 2625 CellScanner cellScanner, Throwable error) { 2626 rpcServer.getMetrics().exception(error); 2627 regionActionResultBuilder.setException(ResponseConverter.buildException(error)); 2628 responseBuilder.addRegionActionResult(regionActionResultBuilder.build()); 2629 // All Mutations in this RegionAction not executed as we can not see the Region online here 2630 // in this RS. Will be retried from Client. Skipping all the Cells in CellScanner 2631 // corresponding to these Mutations. 2632 if (cellScanner != null) { 2633 skipCellsForMutations(regionAction.getActionList(), cellScanner); 2634 } 2635 } 2636 2637 private boolean isReplicationRequest(Action action) { 2638 // replication request can only be put or delete. 2639 if (!action.hasMutation()) { 2640 return false; 2641 } 2642 MutationProto mutation = action.getMutation(); 2643 MutationType type = mutation.getMutateType(); 2644 if (type != MutationType.PUT && type != MutationType.DELETE) { 2645 return false; 2646 } 2647 // replication will set a special attribute so we can make use of it to decide whether a request 2648 // is for replication. 2649 return mutation.getAttributeList().stream().map(p -> p.getName()) 2650 .filter(n -> n.equals(ReplicationUtils.REPLICATION_ATTR_NAME)).findAny().isPresent(); 2651 } 2652 2653 /** 2654 * Execute multiple actions on a table: get, mutate, and/or execCoprocessor 2655 * @param rpcc the RPC controller 2656 * @param request the multi request 2657 */ 2658 @Override 2659 public MultiResponse multi(final RpcController rpcc, final MultiRequest request) 2660 throws ServiceException { 2661 try { 2662 checkOpen(); 2663 } catch (IOException ie) { 2664 throw new ServiceException(ie); 2665 } 2666 2667 checkBatchSizeAndLogLargeSize(request); 2668 2669 // rpc controller is how we bring in data via the back door; it is unprotobuf'ed data. 2670 // It is also the conduit via which we pass back data. 2671 HBaseRpcController controller = (HBaseRpcController) rpcc; 2672 CellScanner cellScanner = controller != null ? getAndReset(controller) : null; 2673 2674 long nonceGroup = request.hasNonceGroup() ? request.getNonceGroup() : HConstants.NO_NONCE; 2675 2676 MultiResponse.Builder responseBuilder = MultiResponse.newBuilder(); 2677 RegionActionResult.Builder regionActionResultBuilder = RegionActionResult.newBuilder(); 2678 this.rpcMultiRequestCount.increment(); 2679 this.requestCount.increment(); 2680 ActivePolicyEnforcement spaceQuotaEnforcement = getSpaceQuotaManager().getActiveEnforcements(); 2681 2682 // We no longer use MultiRequest#condition. Instead, we use RegionAction#condition. The 2683 // following logic is for backward compatibility as old clients still use 2684 // MultiRequest#condition in case of checkAndMutate with RowMutations. 2685 if (request.hasCondition()) { 2686 if (request.getRegionActionList().isEmpty()) { 2687 // If the region action list is empty, do nothing. 2688 responseBuilder.setProcessed(true); 2689 return responseBuilder.build(); 2690 } 2691 2692 RegionAction regionAction = request.getRegionAction(0); 2693 2694 // When request.hasCondition() is true, regionAction.getAtomic() should be always true. So 2695 // we can assume regionAction.getAtomic() is true here. 2696 assert regionAction.getAtomic(); 2697 2698 OperationQuota quota; 2699 HRegion region; 2700 RegionSpecifier regionSpecifier = regionAction.getRegion(); 2701 2702 try { 2703 region = getRegion(regionSpecifier); 2704 quota = getRpcQuotaManager().checkBatchQuota(region, regionAction.getActionList(), 2705 regionAction.hasCondition()); 2706 } catch (IOException e) { 2707 failRegionAction(responseBuilder, regionActionResultBuilder, regionAction, cellScanner, e); 2708 return responseBuilder.build(); 2709 } 2710 2711 try { 2712 boolean rejectIfFromClient = shouldRejectRequestsFromClient(region); 2713 // We only allow replication in standby state and it will not set the atomic flag. 2714 if (rejectIfFromClient) { 2715 failRegionAction(responseBuilder, regionActionResultBuilder, regionAction, cellScanner, 2716 new DoNotRetryIOException( 2717 region.getRegionInfo().getRegionNameAsString() + " is in STANDBY state")); 2718 return responseBuilder.build(); 2719 } 2720 2721 try { 2722 CheckAndMutateResult result = checkAndMutate(region, regionAction.getActionList(), 2723 cellScanner, request.getCondition(), nonceGroup, spaceQuotaEnforcement); 2724 responseBuilder.setProcessed(result.isSuccess()); 2725 ClientProtos.ResultOrException.Builder resultOrExceptionOrBuilder = 2726 ClientProtos.ResultOrException.newBuilder(); 2727 for (int i = 0; i < regionAction.getActionCount(); i++) { 2728 // To unify the response format with doNonAtomicRegionMutation and read through 2729 // client's AsyncProcess we have to add an empty result instance per operation 2730 resultOrExceptionOrBuilder.clear(); 2731 resultOrExceptionOrBuilder.setIndex(i); 2732 regionActionResultBuilder.addResultOrException(resultOrExceptionOrBuilder.build()); 2733 } 2734 } catch (IOException e) { 2735 rpcServer.getMetrics().exception(e); 2736 // As it's an atomic operation with a condition, we may expect it's a global failure. 2737 regionActionResultBuilder.setException(ResponseConverter.buildException(e)); 2738 } 2739 } finally { 2740 quota.close(); 2741 } 2742 2743 responseBuilder.addRegionActionResult(regionActionResultBuilder.build()); 2744 ClientProtos.RegionLoadStats regionLoadStats = region.getLoadStatistics(); 2745 if (regionLoadStats != null) { 2746 responseBuilder.setRegionStatistics(MultiRegionLoadStats.newBuilder() 2747 .addRegion(regionSpecifier).addStat(regionLoadStats).build()); 2748 } 2749 return responseBuilder.build(); 2750 } 2751 2752 // this will contain all the cells that we need to return. It's created later, if needed. 2753 List<ExtendedCellScannable> cellsToReturn = null; 2754 RegionScannersCloseCallBack closeCallBack = null; 2755 RpcCallContext context = RpcServer.getCurrentCall().orElse(null); 2756 Map<RegionSpecifier, ClientProtos.RegionLoadStats> regionStats = 2757 new HashMap<>(request.getRegionActionCount()); 2758 2759 for (RegionAction regionAction : request.getRegionActionList()) { 2760 OperationQuota quota; 2761 HRegion region; 2762 RegionSpecifier regionSpecifier = regionAction.getRegion(); 2763 regionActionResultBuilder.clear(); 2764 2765 try { 2766 region = getRegion(regionSpecifier); 2767 quota = getRpcQuotaManager().checkBatchQuota(region, regionAction.getActionList(), 2768 regionAction.hasCondition()); 2769 } catch (IOException e) { 2770 failRegionAction(responseBuilder, regionActionResultBuilder, regionAction, cellScanner, e); 2771 continue; // For this region it's a failure. 2772 } 2773 2774 try { 2775 boolean rejectIfFromClient = shouldRejectRequestsFromClient(region); 2776 2777 if (regionAction.hasCondition()) { 2778 // We only allow replication in standby state and it will not set the atomic flag. 2779 if (rejectIfFromClient) { 2780 failRegionAction(responseBuilder, regionActionResultBuilder, regionAction, cellScanner, 2781 new DoNotRetryIOException( 2782 region.getRegionInfo().getRegionNameAsString() + " is in STANDBY state")); 2783 continue; 2784 } 2785 2786 try { 2787 ClientProtos.ResultOrException.Builder resultOrExceptionOrBuilder = 2788 ClientProtos.ResultOrException.newBuilder(); 2789 if (regionAction.getActionCount() == 1) { 2790 CheckAndMutateResult result = 2791 checkAndMutate(region, quota, regionAction.getAction(0).getMutation(), cellScanner, 2792 regionAction.getCondition(), nonceGroup, spaceQuotaEnforcement, context); 2793 regionActionResultBuilder.setProcessed(result.isSuccess()); 2794 resultOrExceptionOrBuilder.setIndex(0); 2795 if (result.getResult() != null) { 2796 resultOrExceptionOrBuilder.setResult(ProtobufUtil.toResult(result.getResult())); 2797 } 2798 regionActionResultBuilder.addResultOrException(resultOrExceptionOrBuilder.build()); 2799 } else { 2800 CheckAndMutateResult result = checkAndMutate(region, regionAction.getActionList(), 2801 cellScanner, regionAction.getCondition(), nonceGroup, spaceQuotaEnforcement); 2802 regionActionResultBuilder.setProcessed(result.isSuccess()); 2803 for (int i = 0; i < regionAction.getActionCount(); i++) { 2804 if (i == 0 && result.getResult() != null) { 2805 // Set the result of the Increment/Append operations to the first element of the 2806 // ResultOrException list 2807 resultOrExceptionOrBuilder.setIndex(i); 2808 regionActionResultBuilder.addResultOrException(resultOrExceptionOrBuilder 2809 .setResult(ProtobufUtil.toResult(result.getResult())).build()); 2810 continue; 2811 } 2812 // To unify the response format with doNonAtomicRegionMutation and read through 2813 // client's AsyncProcess we have to add an empty result instance per operation 2814 resultOrExceptionOrBuilder.clear(); 2815 resultOrExceptionOrBuilder.setIndex(i); 2816 regionActionResultBuilder.addResultOrException(resultOrExceptionOrBuilder.build()); 2817 } 2818 } 2819 } catch (IOException e) { 2820 rpcServer.getMetrics().exception(e); 2821 // As it's an atomic operation with a condition, we may expect it's a global failure. 2822 regionActionResultBuilder.setException(ResponseConverter.buildException(e)); 2823 } 2824 } else if (regionAction.hasAtomic() && regionAction.getAtomic()) { 2825 // We only allow replication in standby state and it will not set the atomic flag. 2826 if (rejectIfFromClient) { 2827 failRegionAction(responseBuilder, regionActionResultBuilder, regionAction, cellScanner, 2828 new DoNotRetryIOException( 2829 region.getRegionInfo().getRegionNameAsString() + " is in STANDBY state")); 2830 continue; 2831 } 2832 try { 2833 doAtomicBatchOp(regionActionResultBuilder, region, quota, regionAction.getActionList(), 2834 cellScanner, nonceGroup, spaceQuotaEnforcement); 2835 regionActionResultBuilder.setProcessed(true); 2836 // We no longer use MultiResponse#processed. Instead, we use 2837 // RegionActionResult#processed. This is for backward compatibility for old clients. 2838 responseBuilder.setProcessed(true); 2839 } catch (IOException e) { 2840 rpcServer.getMetrics().exception(e); 2841 // As it's atomic, we may expect it's a global failure. 2842 regionActionResultBuilder.setException(ResponseConverter.buildException(e)); 2843 } 2844 } else { 2845 if ( 2846 rejectIfFromClient && regionAction.getActionCount() > 0 2847 && !isReplicationRequest(regionAction.getAction(0)) 2848 ) { 2849 // fail if it is not a replication request 2850 failRegionAction(responseBuilder, regionActionResultBuilder, regionAction, cellScanner, 2851 new DoNotRetryIOException( 2852 region.getRegionInfo().getRegionNameAsString() + " is in STANDBY state")); 2853 continue; 2854 } 2855 // doNonAtomicRegionMutation manages the exception internally 2856 if (context != null && closeCallBack == null) { 2857 // An RpcCallBack that creates a list of scanners that needs to perform callBack 2858 // operation on completion of multiGets. 2859 // Set this only once 2860 closeCallBack = new RegionScannersCloseCallBack(); 2861 context.setCallBack(closeCallBack); 2862 } 2863 cellsToReturn = doNonAtomicRegionMutation(region, quota, regionAction, cellScanner, 2864 regionActionResultBuilder, cellsToReturn, nonceGroup, closeCallBack, context, 2865 spaceQuotaEnforcement); 2866 } 2867 } finally { 2868 quota.close(); 2869 } 2870 2871 responseBuilder.addRegionActionResult(regionActionResultBuilder.build()); 2872 ClientProtos.RegionLoadStats regionLoadStats = region.getLoadStatistics(); 2873 if (regionLoadStats != null) { 2874 regionStats.put(regionSpecifier, regionLoadStats); 2875 } 2876 } 2877 // Load the controller with the Cells to return. 2878 if (cellsToReturn != null && !cellsToReturn.isEmpty() && controller != null) { 2879 controller.setCellScanner(PrivateCellUtil.createExtendedCellScanner(cellsToReturn)); 2880 } 2881 2882 MultiRegionLoadStats.Builder builder = MultiRegionLoadStats.newBuilder(); 2883 for (Entry<RegionSpecifier, ClientProtos.RegionLoadStats> stat : regionStats.entrySet()) { 2884 builder.addRegion(stat.getKey()); 2885 builder.addStat(stat.getValue()); 2886 } 2887 responseBuilder.setRegionStatistics(builder); 2888 return responseBuilder.build(); 2889 } 2890 2891 private void skipCellsForMutations(List<Action> actions, CellScanner cellScanner) { 2892 if (cellScanner == null) { 2893 return; 2894 } 2895 for (Action action : actions) { 2896 skipCellsForMutation(action, cellScanner); 2897 } 2898 } 2899 2900 private void skipCellsForMutation(Action action, CellScanner cellScanner) { 2901 if (cellScanner == null) { 2902 return; 2903 } 2904 try { 2905 if (action.hasMutation()) { 2906 MutationProto m = action.getMutation(); 2907 if (m.hasAssociatedCellCount()) { 2908 for (int i = 0; i < m.getAssociatedCellCount(); i++) { 2909 cellScanner.advance(); 2910 } 2911 } 2912 } 2913 } catch (IOException e) { 2914 // No need to handle these Individual Muatation level issue. Any way this entire RegionAction 2915 // marked as failed as we could not see the Region here. At client side the top level 2916 // RegionAction exception will be considered first. 2917 LOG.error("Error while skipping Cells in CellScanner for invalid Region Mutations", e); 2918 } 2919 } 2920 2921 /** 2922 * Mutate data in a table. 2923 * @param rpcc the RPC controller 2924 * @param request the mutate request 2925 */ 2926 @Override 2927 public MutateResponse mutate(final RpcController rpcc, final MutateRequest request) 2928 throws ServiceException { 2929 // rpc controller is how we bring in data via the back door; it is unprotobuf'ed data. 2930 // It is also the conduit via which we pass back data. 2931 HBaseRpcController controller = (HBaseRpcController) rpcc; 2932 CellScanner cellScanner = controller != null ? controller.cellScanner() : null; 2933 OperationQuota quota = null; 2934 RpcCallContext context = RpcServer.getCurrentCall().orElse(null); 2935 // Clear scanner so we are not holding on to reference across call. 2936 if (controller != null) { 2937 controller.setCellScanner(null); 2938 } 2939 try { 2940 checkOpen(); 2941 requestCount.increment(); 2942 rpcMutateRequestCount.increment(); 2943 HRegion region = getRegion(request.getRegion()); 2944 rejectIfInStandByState(region); 2945 MutateResponse.Builder builder = MutateResponse.newBuilder(); 2946 MutationProto mutation = request.getMutation(); 2947 if (!region.getRegionInfo().isMetaRegion()) { 2948 server.getMemStoreFlusher().reclaimMemStoreMemory(); 2949 } 2950 long nonceGroup = request.hasNonceGroup() ? request.getNonceGroup() : HConstants.NO_NONCE; 2951 OperationQuota.OperationType operationType = QuotaUtil.getQuotaOperationType(request); 2952 quota = getRpcQuotaManager().checkBatchQuota(region, operationType); 2953 ActivePolicyEnforcement spaceQuotaEnforcement = 2954 getSpaceQuotaManager().getActiveEnforcements(); 2955 2956 if (request.hasCondition()) { 2957 CheckAndMutateResult result = checkAndMutate(region, quota, mutation, cellScanner, 2958 request.getCondition(), nonceGroup, spaceQuotaEnforcement, context); 2959 builder.setProcessed(result.isSuccess()); 2960 boolean clientCellBlockSupported = isClientCellBlockSupport(context); 2961 addResult(builder, result.getResult(), controller, clientCellBlockSupported); 2962 if (clientCellBlockSupported) { 2963 addSize(context, result.getResult()); 2964 } 2965 } else { 2966 Result r = null; 2967 Boolean processed = null; 2968 MutationType type = mutation.getMutateType(); 2969 switch (type) { 2970 case APPEND: 2971 // TODO: this doesn't actually check anything. 2972 r = append(region, quota, mutation, cellScanner, nonceGroup, spaceQuotaEnforcement, 2973 context); 2974 break; 2975 case INCREMENT: 2976 // TODO: this doesn't actually check anything. 2977 r = increment(region, quota, mutation, cellScanner, nonceGroup, spaceQuotaEnforcement, 2978 context); 2979 break; 2980 case PUT: 2981 put(region, quota, mutation, cellScanner, spaceQuotaEnforcement); 2982 processed = Boolean.TRUE; 2983 break; 2984 case DELETE: 2985 delete(region, quota, mutation, cellScanner, spaceQuotaEnforcement); 2986 processed = Boolean.TRUE; 2987 break; 2988 default: 2989 throw new DoNotRetryIOException("Unsupported mutate type: " + type.name()); 2990 } 2991 if (processed != null) { 2992 builder.setProcessed(processed); 2993 } 2994 boolean clientCellBlockSupported = isClientCellBlockSupport(context); 2995 addResult(builder, r, controller, clientCellBlockSupported); 2996 if (clientCellBlockSupported) { 2997 addSize(context, r); 2998 } 2999 } 3000 return builder.build(); 3001 } catch (IOException ie) { 3002 server.checkFileSystem(); 3003 throw new ServiceException(ie); 3004 } finally { 3005 if (quota != null) { 3006 quota.close(); 3007 } 3008 } 3009 } 3010 3011 private void put(HRegion region, OperationQuota quota, MutationProto mutation, 3012 CellScanner cellScanner, ActivePolicyEnforcement spaceQuota) throws IOException { 3013 long before = EnvironmentEdgeManager.currentTime(); 3014 Put put = ProtobufUtil.toPut(mutation, cellScanner); 3015 checkCellSizeLimit(region, put); 3016 spaceQuota.getPolicyEnforcement(region).check(put); 3017 quota.addMutation(put); 3018 region.put(put); 3019 3020 MetricsRegionServer metricsRegionServer = server.getMetrics(); 3021 if (metricsRegionServer != null) { 3022 long after = EnvironmentEdgeManager.currentTime(); 3023 metricsRegionServer.updatePut(region, after - before); 3024 } 3025 } 3026 3027 private void delete(HRegion region, OperationQuota quota, MutationProto mutation, 3028 CellScanner cellScanner, ActivePolicyEnforcement spaceQuota) throws IOException { 3029 long before = EnvironmentEdgeManager.currentTime(); 3030 Delete delete = ProtobufUtil.toDelete(mutation, cellScanner); 3031 checkCellSizeLimit(region, delete); 3032 spaceQuota.getPolicyEnforcement(region).check(delete); 3033 quota.addMutation(delete); 3034 region.delete(delete); 3035 3036 MetricsRegionServer metricsRegionServer = server.getMetrics(); 3037 if (metricsRegionServer != null) { 3038 long after = EnvironmentEdgeManager.currentTime(); 3039 metricsRegionServer.updateDelete(region, after - before); 3040 } 3041 } 3042 3043 private CheckAndMutateResult checkAndMutate(HRegion region, OperationQuota quota, 3044 MutationProto mutation, CellScanner cellScanner, Condition condition, long nonceGroup, 3045 ActivePolicyEnforcement spaceQuota, RpcCallContext context) throws IOException { 3046 long before = EnvironmentEdgeManager.currentTime(); 3047 long blockBytesScannedBefore = context != null ? context.getBlockBytesScanned() : 0; 3048 CheckAndMutate checkAndMutate = ProtobufUtil.toCheckAndMutate(condition, mutation, cellScanner); 3049 long nonce = mutation.hasNonce() ? mutation.getNonce() : HConstants.NO_NONCE; 3050 checkCellSizeLimit(region, (Mutation) checkAndMutate.getAction()); 3051 spaceQuota.getPolicyEnforcement(region).check((Mutation) checkAndMutate.getAction()); 3052 quota.addMutation((Mutation) checkAndMutate.getAction()); 3053 3054 CheckAndMutateResult result = null; 3055 if (region.getCoprocessorHost() != null) { 3056 result = region.getCoprocessorHost().preCheckAndMutate(checkAndMutate); 3057 } 3058 if (result == null) { 3059 result = region.checkAndMutate(checkAndMutate, nonceGroup, nonce); 3060 if (region.getCoprocessorHost() != null) { 3061 result = region.getCoprocessorHost().postCheckAndMutate(checkAndMutate, result); 3062 } 3063 } 3064 MetricsRegionServer metricsRegionServer = server.getMetrics(); 3065 if (metricsRegionServer != null) { 3066 long after = EnvironmentEdgeManager.currentTime(); 3067 long blockBytesScanned = 3068 context != null ? context.getBlockBytesScanned() - blockBytesScannedBefore : 0; 3069 metricsRegionServer.updateCheckAndMutate(region, after - before, blockBytesScanned); 3070 3071 MutationType type = mutation.getMutateType(); 3072 switch (type) { 3073 case PUT: 3074 metricsRegionServer.updateCheckAndPut(region, after - before); 3075 break; 3076 case DELETE: 3077 metricsRegionServer.updateCheckAndDelete(region, after - before); 3078 break; 3079 default: 3080 break; 3081 } 3082 } 3083 return result; 3084 } 3085 3086 // This is used to keep compatible with the old client implementation. Consider remove it if we 3087 // decide to drop the support of the client that still sends close request to a region scanner 3088 // which has already been exhausted. 3089 @Deprecated 3090 private static final IOException SCANNER_ALREADY_CLOSED = new IOException() { 3091 3092 private static final long serialVersionUID = -4305297078988180130L; 3093 3094 @Override 3095 public synchronized Throwable fillInStackTrace() { 3096 return this; 3097 } 3098 }; 3099 3100 private RegionScannerHolder getRegionScanner(ScanRequest request) throws IOException { 3101 String scannerName = toScannerName(request.getScannerId()); 3102 RegionScannerHolder rsh = this.scanners.get(scannerName); 3103 if (rsh == null) { 3104 // just ignore the next or close request if scanner does not exists. 3105 Long lastCallSeq = closedScanners.getIfPresent(scannerName); 3106 if (lastCallSeq != null) { 3107 // Check the sequence number to catch if the last call was incorrectly retried. 3108 // The only allowed scenario is when the scanner is exhausted and one more scan 3109 // request arrives - in this case returning 0 rows is correct. 3110 if (request.hasNextCallSeq() && request.getNextCallSeq() != lastCallSeq + 1) { 3111 throw new OutOfOrderScannerNextException("Expected nextCallSeq for closed request: " 3112 + (lastCallSeq + 1) + " But the nextCallSeq got from client: " 3113 + request.getNextCallSeq() + "; request=" + TextFormat.shortDebugString(request)); 3114 } else { 3115 throw SCANNER_ALREADY_CLOSED; 3116 } 3117 } else { 3118 LOG.warn("Client tried to access missing scanner " + scannerName); 3119 throw new UnknownScannerException( 3120 "Unknown scanner '" + scannerName + "'. This can happen due to any of the following " 3121 + "reasons: a) Scanner id given is wrong, b) Scanner lease expired because of " 3122 + "long wait between consecutive client checkins, c) Server may be closing down, " 3123 + "d) RegionServer restart during upgrade.\nIf the issue is due to reason (b), a " 3124 + "possible fix would be increasing the value of" 3125 + "'hbase.client.scanner.timeout.period' configuration."); 3126 } 3127 } 3128 rejectIfInStandByState(rsh.r); 3129 RegionInfo hri = rsh.s.getRegionInfo(); 3130 // Yes, should be the same instance 3131 if (server.getOnlineRegion(hri.getRegionName()) != rsh.r) { 3132 String msg = "Region has changed on the scanner " + scannerName + ": regionName=" 3133 + hri.getRegionNameAsString() + ", scannerRegionName=" + rsh.r; 3134 LOG.warn(msg + ", closing..."); 3135 scanners.remove(scannerName); 3136 try { 3137 rsh.s.close(); 3138 } catch (IOException e) { 3139 LOG.warn("Getting exception closing " + scannerName, e); 3140 } finally { 3141 try { 3142 server.getLeaseManager().cancelLease(scannerName); 3143 } catch (LeaseException e) { 3144 LOG.warn("Getting exception closing " + scannerName, e); 3145 } 3146 } 3147 throw new NotServingRegionException(msg); 3148 } 3149 return rsh; 3150 } 3151 3152 /** 3153 * @return Pair with scannerName key to use with this new Scanner and its RegionScannerHolder 3154 * value. 3155 */ 3156 private Pair<String, RegionScannerHolder> newRegionScanner(ScanRequest request, HRegion region, 3157 ScanResponse.Builder builder) throws IOException { 3158 rejectIfInStandByState(region); 3159 ClientProtos.Scan protoScan = request.getScan(); 3160 boolean isLoadingCfsOnDemandSet = protoScan.hasLoadColumnFamiliesOnDemand(); 3161 Scan scan = ProtobufUtil.toScan(protoScan); 3162 // if the request doesn't set this, get the default region setting. 3163 if (!isLoadingCfsOnDemandSet) { 3164 scan.setLoadColumnFamiliesOnDemand(region.isLoadingCfsOnDemandDefault()); 3165 } 3166 3167 if (!scan.hasFamilies()) { 3168 // Adding all families to scanner 3169 for (byte[] family : region.getTableDescriptor().getColumnFamilyNames()) { 3170 scan.addFamily(family); 3171 } 3172 } 3173 if (region.getCoprocessorHost() != null) { 3174 // preScannerOpen is not allowed to return a RegionScanner. Only post hook can create a 3175 // wrapper for the core created RegionScanner 3176 region.getCoprocessorHost().preScannerOpen(scan); 3177 } 3178 RegionScannerImpl coreScanner = region.getScanner(scan); 3179 Shipper shipper = coreScanner; 3180 RegionScanner scanner = coreScanner; 3181 try { 3182 if (region.getCoprocessorHost() != null) { 3183 scanner = region.getCoprocessorHost().postScannerOpen(scan, scanner); 3184 } 3185 } catch (Exception e) { 3186 // Although region coprocessor is for advanced users and they should take care of the 3187 // implementation to not damage the HBase system, closing the scanner on exception here does 3188 // not have any bad side effect, so let's do it 3189 scanner.close(); 3190 throw e; 3191 } 3192 long scannerId = scannerIdGenerator.generateNewScannerId(); 3193 builder.setScannerId(scannerId); 3194 builder.setMvccReadPoint(scanner.getMvccReadPoint()); 3195 builder.setTtl(scannerLeaseTimeoutPeriod); 3196 String scannerName = toScannerName(scannerId); 3197 3198 boolean fullRegionScan = 3199 !region.getRegionInfo().getTable().isSystemTable() && isFullRegionScan(scan, region); 3200 3201 return new Pair<String, RegionScannerHolder>(scannerName, 3202 addScanner(scannerName, scanner, shipper, region, scan.isNeedCursorResult(), fullRegionScan)); 3203 } 3204 3205 /** 3206 * The returned String is used as key doing look up of outstanding Scanners in this Servers' 3207 * this.scanners, the Map of outstanding scanners and their current state. 3208 * @param scannerId A scanner long id. 3209 * @return The long id as a String. 3210 */ 3211 private static String toScannerName(long scannerId) { 3212 return Long.toString(scannerId); 3213 } 3214 3215 private void checkScanNextCallSeq(ScanRequest request, RegionScannerHolder rsh) 3216 throws OutOfOrderScannerNextException { 3217 // if nextCallSeq does not match throw Exception straight away. This needs to be 3218 // performed even before checking of Lease. 3219 // See HBASE-5974 3220 if (request.hasNextCallSeq()) { 3221 long callSeq = request.getNextCallSeq(); 3222 if (!rsh.incNextCallSeq(callSeq)) { 3223 throw new OutOfOrderScannerNextException( 3224 "Expected nextCallSeq: " + rsh.getNextCallSeq() + " But the nextCallSeq got from client: " 3225 + request.getNextCallSeq() + "; request=" + TextFormat.shortDebugString(request)); 3226 } 3227 } 3228 } 3229 3230 private void addScannerLeaseBack(LeaseManager.Lease lease) { 3231 try { 3232 server.getLeaseManager().addLease(lease); 3233 } catch (LeaseStillHeldException e) { 3234 // should not happen as the scanner id is unique. 3235 throw new AssertionError(e); 3236 } 3237 } 3238 3239 // visible for testing only 3240 long getTimeLimit(RpcCall rpcCall, HBaseRpcController controller, 3241 boolean allowHeartbeatMessages) { 3242 // Set the time limit to be half of the more restrictive timeout value (one of the 3243 // timeout values must be positive). In the event that both values are positive, the 3244 // more restrictive of the two is used to calculate the limit. 3245 if (allowHeartbeatMessages) { 3246 long now = EnvironmentEdgeManager.currentTime(); 3247 long remainingTimeout = getRemainingRpcTimeout(rpcCall, controller, now); 3248 if (scannerLeaseTimeoutPeriod > 0 || remainingTimeout > 0) { 3249 long timeLimitDelta; 3250 if (scannerLeaseTimeoutPeriod > 0 && remainingTimeout > 0) { 3251 timeLimitDelta = Math.min(scannerLeaseTimeoutPeriod, remainingTimeout); 3252 } else { 3253 timeLimitDelta = 3254 scannerLeaseTimeoutPeriod > 0 ? scannerLeaseTimeoutPeriod : remainingTimeout; 3255 } 3256 3257 // Use half of whichever timeout value was more restrictive... But don't allow 3258 // the time limit to be less than the allowable minimum (could cause an 3259 // immediate timeout before scanning any data). 3260 timeLimitDelta = Math.max(timeLimitDelta / 2, minimumScanTimeLimitDelta); 3261 return now + timeLimitDelta; 3262 } 3263 } 3264 // Default value of timeLimit is negative to indicate no timeLimit should be 3265 // enforced. 3266 return -1L; 3267 } 3268 3269 private long getRemainingRpcTimeout(RpcCall call, HBaseRpcController controller, long now) { 3270 long timeout; 3271 if (controller != null && controller.getCallTimeout() > 0) { 3272 timeout = controller.getCallTimeout(); 3273 } else if (rpcTimeout > 0) { 3274 timeout = rpcTimeout; 3275 } else { 3276 return -1; 3277 } 3278 if (call != null) { 3279 timeout -= (now - call.getReceiveTime()); 3280 } 3281 // getTimeLimit ignores values <= 0, but timeout may now be negative if queue time was high. 3282 // return minimum value here in that case so we count this in calculating the final delta. 3283 return Math.max(minimumScanTimeLimitDelta, timeout); 3284 } 3285 3286 private void checkLimitOfRows(int numOfCompleteRows, int limitOfRows, boolean moreRows, 3287 ScannerContext scannerContext, ScanResponse.Builder builder) { 3288 if (numOfCompleteRows >= limitOfRows) { 3289 if (LOG.isTraceEnabled()) { 3290 LOG.trace("Done scanning, limit of rows reached, moreRows: " + moreRows 3291 + " scannerContext: " + scannerContext); 3292 } 3293 builder.setMoreResults(false); 3294 } 3295 } 3296 3297 // return whether we have more results in region. 3298 private void scan(HBaseRpcController controller, ScanRequest request, RegionScannerHolder rsh, 3299 long maxQuotaResultSize, int maxResults, int limitOfRows, List<Result> results, 3300 ScanResponse.Builder builder, RpcCall rpcCall) throws IOException { 3301 HRegion region = rsh.r; 3302 RegionScanner scanner = rsh.s; 3303 long maxResultSize; 3304 if (scanner.getMaxResultSize() > 0) { 3305 maxResultSize = Math.min(scanner.getMaxResultSize(), maxQuotaResultSize); 3306 } else { 3307 maxResultSize = maxQuotaResultSize; 3308 } 3309 // This is cells inside a row. Default size is 10 so if many versions or many cfs, 3310 // then we'll resize. Resizings show in profiler. Set it higher than 10. For now 3311 // arbitrary 32. TODO: keep record of general size of results being returned. 3312 ArrayList<Cell> values = new ArrayList<>(32); 3313 region.startRegionOperation(Operation.SCAN); 3314 long before = EnvironmentEdgeManager.currentTime(); 3315 // Used to check if we've matched the row limit set on the Scan 3316 int numOfCompleteRows = 0; 3317 // Count of times we call nextRaw; can be > numOfCompleteRows. 3318 int numOfNextRawCalls = 0; 3319 try { 3320 int numOfResults = 0; 3321 synchronized (scanner) { 3322 boolean stale = (region.getRegionInfo().getReplicaId() != 0); 3323 boolean clientHandlesPartials = 3324 request.hasClientHandlesPartials() && request.getClientHandlesPartials(); 3325 boolean clientHandlesHeartbeats = 3326 request.hasClientHandlesHeartbeats() && request.getClientHandlesHeartbeats(); 3327 3328 // On the server side we must ensure that the correct ordering of partial results is 3329 // returned to the client to allow them to properly reconstruct the partial results. 3330 // If the coprocessor host is adding to the result list, we cannot guarantee the 3331 // correct ordering of partial results and so we prevent partial results from being 3332 // formed. 3333 boolean serverGuaranteesOrderOfPartials = results.isEmpty(); 3334 boolean allowPartialResults = clientHandlesPartials && serverGuaranteesOrderOfPartials; 3335 boolean moreRows = false; 3336 3337 // Heartbeat messages occur when the processing of the ScanRequest is exceeds a 3338 // certain time threshold on the server. When the time threshold is exceeded, the 3339 // server stops the scan and sends back whatever Results it has accumulated within 3340 // that time period (may be empty). Since heartbeat messages have the potential to 3341 // create partial Results (in the event that the timeout occurs in the middle of a 3342 // row), we must only generate heartbeat messages when the client can handle both 3343 // heartbeats AND partials 3344 boolean allowHeartbeatMessages = clientHandlesHeartbeats && allowPartialResults; 3345 3346 long timeLimit = getTimeLimit(rpcCall, controller, allowHeartbeatMessages); 3347 3348 final LimitScope sizeScope = 3349 allowPartialResults ? LimitScope.BETWEEN_CELLS : LimitScope.BETWEEN_ROWS; 3350 final LimitScope timeScope = 3351 allowHeartbeatMessages ? LimitScope.BETWEEN_CELLS : LimitScope.BETWEEN_ROWS; 3352 3353 boolean trackMetrics = request.hasTrackScanMetrics() && request.getTrackScanMetrics(); 3354 3355 // Configure with limits for this RPC. Set keep progress true since size progress 3356 // towards size limit should be kept between calls to nextRaw 3357 ScannerContext.Builder contextBuilder = ScannerContext.newBuilder(true); 3358 // maxResultSize - either we can reach this much size for all cells(being read) data or sum 3359 // of heap size occupied by cells(being read). Cell data means its key and value parts. 3360 // maxQuotaResultSize - max results just from server side configuration and quotas, without 3361 // user's specified max. We use this for evaluating limits based on blocks (not cells). 3362 // We may have accumulated some results in coprocessor preScannerNext call. Subtract any 3363 // cell or block size from maximum here so we adhere to total limits of request. 3364 // Note: we track block size in StoreScanner. If the CP hook got cells from hbase, it will 3365 // have accumulated block bytes. If not, this will be 0 for block size. 3366 long maxCellSize = maxResultSize; 3367 long maxBlockSize = maxQuotaResultSize; 3368 if (rpcCall != null) { 3369 maxBlockSize -= rpcCall.getBlockBytesScanned(); 3370 maxCellSize -= rpcCall.getResponseCellSize(); 3371 } 3372 3373 contextBuilder.setSizeLimit(sizeScope, maxCellSize, maxCellSize, maxBlockSize); 3374 contextBuilder.setBatchLimit(scanner.getBatch()); 3375 contextBuilder.setTimeLimit(timeScope, timeLimit); 3376 contextBuilder.setTrackMetrics(trackMetrics); 3377 ScannerContext scannerContext = contextBuilder.build(); 3378 boolean limitReached = false; 3379 while (numOfResults < maxResults) { 3380 // Reset the batch progress to 0 before every call to RegionScanner#nextRaw. The 3381 // batch limit is a limit on the number of cells per Result. Thus, if progress is 3382 // being tracked (i.e. scannerContext.keepProgress() is true) then we need to 3383 // reset the batch progress between nextRaw invocations since we don't want the 3384 // batch progress from previous calls to affect future calls 3385 scannerContext.setBatchProgress(0); 3386 assert values.isEmpty(); 3387 3388 // Collect values to be returned here 3389 moreRows = scanner.nextRaw(values, scannerContext); 3390 if (rpcCall == null) { 3391 // When there is no RpcCallContext,copy EC to heap, then the scanner would close, 3392 // This can be an EXPENSIVE call. It may make an extra copy from offheap to onheap 3393 // buffers.See more details in HBASE-26036. 3394 CellUtil.cloneIfNecessary(values); 3395 } 3396 numOfNextRawCalls++; 3397 3398 if (!values.isEmpty()) { 3399 if (limitOfRows > 0) { 3400 // First we need to check if the last result is partial and we have a row change. If 3401 // so then we need to increase the numOfCompleteRows. 3402 if (results.isEmpty()) { 3403 if ( 3404 rsh.rowOfLastPartialResult != null 3405 && !CellUtil.matchingRows(values.get(0), rsh.rowOfLastPartialResult) 3406 ) { 3407 numOfCompleteRows++; 3408 checkLimitOfRows(numOfCompleteRows, limitOfRows, moreRows, scannerContext, 3409 builder); 3410 } 3411 } else { 3412 Result lastResult = results.get(results.size() - 1); 3413 if ( 3414 lastResult.mayHaveMoreCellsInRow() 3415 && !CellUtil.matchingRows(values.get(0), lastResult.getRow()) 3416 ) { 3417 numOfCompleteRows++; 3418 checkLimitOfRows(numOfCompleteRows, limitOfRows, moreRows, scannerContext, 3419 builder); 3420 } 3421 } 3422 if (builder.hasMoreResults() && !builder.getMoreResults()) { 3423 break; 3424 } 3425 } 3426 boolean mayHaveMoreCellsInRow = scannerContext.mayHaveMoreCellsInRow(); 3427 Result r = Result.create(values, null, stale, mayHaveMoreCellsInRow); 3428 results.add(r); 3429 numOfResults++; 3430 if (!mayHaveMoreCellsInRow && limitOfRows > 0) { 3431 numOfCompleteRows++; 3432 checkLimitOfRows(numOfCompleteRows, limitOfRows, moreRows, scannerContext, builder); 3433 if (builder.hasMoreResults() && !builder.getMoreResults()) { 3434 break; 3435 } 3436 } 3437 } else if (!moreRows && !results.isEmpty()) { 3438 // No more cells for the scan here, we need to ensure that the mayHaveMoreCellsInRow of 3439 // last result is false. Otherwise it's possible that: the first nextRaw returned 3440 // because BATCH_LIMIT_REACHED (BTW it happen to exhaust all cells of the scan),so the 3441 // last result's mayHaveMoreCellsInRow will be true. while the following nextRaw will 3442 // return with moreRows=false, which means moreResultsInRegion would be false, it will 3443 // be a contradictory state (HBASE-21206). 3444 int lastIdx = results.size() - 1; 3445 Result r = results.get(lastIdx); 3446 if (r.mayHaveMoreCellsInRow()) { 3447 results.set(lastIdx, ClientInternalHelper.createResult( 3448 ClientInternalHelper.getExtendedRawCells(r), r.getExists(), r.isStale(), false)); 3449 } 3450 } 3451 boolean sizeLimitReached = scannerContext.checkSizeLimit(LimitScope.BETWEEN_ROWS); 3452 boolean timeLimitReached = scannerContext.checkTimeLimit(LimitScope.BETWEEN_ROWS); 3453 boolean resultsLimitReached = numOfResults >= maxResults; 3454 limitReached = sizeLimitReached || timeLimitReached || resultsLimitReached; 3455 3456 if (limitReached || !moreRows) { 3457 // With block size limit, we may exceed size limit without collecting any results. 3458 // In this case we want to send heartbeat and/or cursor. We don't want to send heartbeat 3459 // or cursor if results were collected, for example for cell size or heap size limits. 3460 boolean sizeLimitReachedWithoutResults = sizeLimitReached && results.isEmpty(); 3461 // We only want to mark a ScanResponse as a heartbeat message in the event that 3462 // there are more values to be read server side. If there aren't more values, 3463 // marking it as a heartbeat is wasteful because the client will need to issue 3464 // another ScanRequest only to realize that they already have all the values 3465 if (moreRows && (timeLimitReached || sizeLimitReachedWithoutResults)) { 3466 // Heartbeat messages occur when the time limit has been reached, or size limit has 3467 // been reached before collecting any results. This can happen for heavily filtered 3468 // scans which scan over too many blocks. 3469 builder.setHeartbeatMessage(true); 3470 if (rsh.needCursor) { 3471 Cell cursorCell = scannerContext.getLastPeekedCell(); 3472 if (cursorCell != null) { 3473 builder.setCursor(ProtobufUtil.toCursor(cursorCell)); 3474 } 3475 } 3476 } 3477 break; 3478 } 3479 values.clear(); 3480 } 3481 if (rpcCall != null) { 3482 rpcCall.incrementResponseCellSize(scannerContext.getHeapSizeProgress()); 3483 } 3484 builder.setMoreResultsInRegion(moreRows); 3485 // Check to see if the client requested that we track metrics server side. If the 3486 // client requested metrics, retrieve the metrics from the scanner context. 3487 if (trackMetrics) { 3488 // rather than increment yet another counter in StoreScanner, just set the value here 3489 // from block size progress before writing into the response 3490 scannerContext.getMetrics().countOfBlockBytesScanned 3491 .set(scannerContext.getBlockSizeProgress()); 3492 if (rpcCall != null) { 3493 scannerContext.getMetrics().fsReadTime.set(rpcCall.getFsReadTime()); 3494 } 3495 Map<String, Long> metrics = scannerContext.getMetrics().getMetricsMap(); 3496 ScanMetrics.Builder metricBuilder = ScanMetrics.newBuilder(); 3497 NameInt64Pair.Builder pairBuilder = NameInt64Pair.newBuilder(); 3498 3499 for (Entry<String, Long> entry : metrics.entrySet()) { 3500 pairBuilder.setName(entry.getKey()); 3501 pairBuilder.setValue(entry.getValue()); 3502 metricBuilder.addMetrics(pairBuilder.build()); 3503 } 3504 3505 builder.setScanMetrics(metricBuilder.build()); 3506 } 3507 } 3508 } finally { 3509 region.closeRegionOperation(); 3510 // Update serverside metrics, even on error. 3511 long end = EnvironmentEdgeManager.currentTime(); 3512 3513 long responseCellSize = 0; 3514 long blockBytesScanned = 0; 3515 if (rpcCall != null) { 3516 responseCellSize = rpcCall.getResponseCellSize(); 3517 blockBytesScanned = rpcCall.getBlockBytesScanned(); 3518 rsh.updateBlockBytesScanned(blockBytesScanned); 3519 } 3520 region.getMetrics().updateScan(); 3521 final MetricsRegionServer metricsRegionServer = server.getMetrics(); 3522 if (metricsRegionServer != null) { 3523 metricsRegionServer.updateScan(region, end - before, responseCellSize, blockBytesScanned); 3524 metricsRegionServer.updateReadQueryMeter(region, numOfNextRawCalls); 3525 } 3526 } 3527 // coprocessor postNext hook 3528 if (region.getCoprocessorHost() != null) { 3529 region.getCoprocessorHost().postScannerNext(scanner, results, maxResults, true); 3530 } 3531 } 3532 3533 /** 3534 * Scan data in a table. 3535 * @param controller the RPC controller 3536 * @param request the scan request 3537 */ 3538 @Override 3539 public ScanResponse scan(final RpcController controller, final ScanRequest request) 3540 throws ServiceException { 3541 if (controller != null && !(controller instanceof HBaseRpcController)) { 3542 throw new UnsupportedOperationException( 3543 "We only do " + "HBaseRpcControllers! FIX IF A PROBLEM: " + controller); 3544 } 3545 if (!request.hasScannerId() && !request.hasScan()) { 3546 throw new ServiceException( 3547 new DoNotRetryIOException("Missing required input: scannerId or scan")); 3548 } 3549 try { 3550 checkOpen(); 3551 } catch (IOException e) { 3552 if (request.hasScannerId()) { 3553 String scannerName = toScannerName(request.getScannerId()); 3554 if (LOG.isDebugEnabled()) { 3555 LOG.debug( 3556 "Server shutting down and client tried to access missing scanner " + scannerName); 3557 } 3558 final LeaseManager leaseManager = server.getLeaseManager(); 3559 if (leaseManager != null) { 3560 try { 3561 leaseManager.cancelLease(scannerName); 3562 } catch (LeaseException le) { 3563 // No problem, ignore 3564 if (LOG.isTraceEnabled()) { 3565 LOG.trace("Un-able to cancel lease of scanner. It could already be closed."); 3566 } 3567 } 3568 } 3569 } 3570 throw new ServiceException(e); 3571 } 3572 requestCount.increment(); 3573 rpcScanRequestCount.increment(); 3574 RegionScannerContext rsx; 3575 ScanResponse.Builder builder = ScanResponse.newBuilder(); 3576 try { 3577 rsx = checkQuotaAndGetRegionScannerContext(request, builder); 3578 } catch (IOException e) { 3579 if (e == SCANNER_ALREADY_CLOSED) { 3580 // Now we will close scanner automatically if there are no more results for this region but 3581 // the old client will still send a close request to us. Just ignore it and return. 3582 return builder.build(); 3583 } 3584 throw new ServiceException(e); 3585 } 3586 String scannerName = rsx.scannerName; 3587 RegionScannerHolder rsh = rsx.holder; 3588 OperationQuota quota = rsx.quota; 3589 if (rsh.fullRegionScan) { 3590 rpcFullScanRequestCount.increment(); 3591 } 3592 HRegion region = rsh.r; 3593 LeaseManager.Lease lease; 3594 try { 3595 // Remove lease while its being processed in server; protects against case 3596 // where processing of request takes > lease expiration time. or null if none found. 3597 lease = server.getLeaseManager().removeLease(scannerName); 3598 } catch (LeaseException e) { 3599 throw new ServiceException(e); 3600 } 3601 if (request.hasRenew() && request.getRenew()) { 3602 // add back and return 3603 addScannerLeaseBack(lease); 3604 try { 3605 checkScanNextCallSeq(request, rsh); 3606 } catch (OutOfOrderScannerNextException e) { 3607 throw new ServiceException(e); 3608 } 3609 return builder.build(); 3610 } 3611 try { 3612 checkScanNextCallSeq(request, rsh); 3613 } catch (OutOfOrderScannerNextException e) { 3614 addScannerLeaseBack(lease); 3615 throw new ServiceException(e); 3616 } 3617 // Now we have increased the next call sequence. If we give client an error, the retry will 3618 // never success. So we'd better close the scanner and return a DoNotRetryIOException to client 3619 // and then client will try to open a new scanner. 3620 boolean closeScanner = request.hasCloseScanner() ? request.getCloseScanner() : false; 3621 int rows; // this is scan.getCaching 3622 if (request.hasNumberOfRows()) { 3623 rows = request.getNumberOfRows(); 3624 } else { 3625 rows = closeScanner ? 0 : 1; 3626 } 3627 RpcCall rpcCall = RpcServer.getCurrentCall().orElse(null); 3628 // now let's do the real scan. 3629 long maxQuotaResultSize = Math.min(maxScannerResultSize, quota.getMaxResultSize()); 3630 RegionScanner scanner = rsh.s; 3631 // this is the limit of rows for this scan, if we the number of rows reach this value, we will 3632 // close the scanner. 3633 int limitOfRows; 3634 if (request.hasLimitOfRows()) { 3635 limitOfRows = request.getLimitOfRows(); 3636 } else { 3637 limitOfRows = -1; 3638 } 3639 boolean scannerClosed = false; 3640 try { 3641 List<Result> results = new ArrayList<>(Math.min(rows, 512)); 3642 if (rows > 0) { 3643 boolean done = false; 3644 // Call coprocessor. Get region info from scanner. 3645 if (region.getCoprocessorHost() != null) { 3646 Boolean bypass = region.getCoprocessorHost().preScannerNext(scanner, results, rows); 3647 if (!results.isEmpty()) { 3648 for (Result r : results) { 3649 // add cell size from CP results so we can track response size and update limits 3650 // when calling scan below if !done. We'll also have tracked block size if the CP 3651 // got results from hbase, since StoreScanner tracks that for all calls automatically. 3652 addSize(rpcCall, r); 3653 } 3654 } 3655 if (bypass != null && bypass.booleanValue()) { 3656 done = true; 3657 } 3658 } 3659 if (!done) { 3660 scan((HBaseRpcController) controller, request, rsh, maxQuotaResultSize, rows, limitOfRows, 3661 results, builder, rpcCall); 3662 } else { 3663 builder.setMoreResultsInRegion(!results.isEmpty()); 3664 } 3665 } else { 3666 // This is a open scanner call with numberOfRow = 0, so set more results in region to true. 3667 builder.setMoreResultsInRegion(true); 3668 } 3669 3670 quota.addScanResult(results); 3671 addResults(builder, results, (HBaseRpcController) controller, 3672 RegionReplicaUtil.isDefaultReplica(region.getRegionInfo()), 3673 isClientCellBlockSupport(rpcCall)); 3674 if (scanner.isFilterDone() && results.isEmpty()) { 3675 // If the scanner's filter - if any - is done with the scan 3676 // only set moreResults to false if the results is empty. This is used to keep compatible 3677 // with the old scan implementation where we just ignore the returned results if moreResults 3678 // is false. Can remove the isEmpty check after we get rid of the old implementation. 3679 builder.setMoreResults(false); 3680 } 3681 // Later we may close the scanner depending on this flag so here we need to make sure that we 3682 // have already set this flag. 3683 assert builder.hasMoreResultsInRegion(); 3684 // we only set moreResults to false in the above code, so set it to true if we haven't set it 3685 // yet. 3686 if (!builder.hasMoreResults()) { 3687 builder.setMoreResults(true); 3688 } 3689 if (builder.getMoreResults() && builder.getMoreResultsInRegion() && !results.isEmpty()) { 3690 // Record the last cell of the last result if it is a partial result 3691 // We need this to calculate the complete rows we have returned to client as the 3692 // mayHaveMoreCellsInRow is true does not mean that there will be extra cells for the 3693 // current row. We may filter out all the remaining cells for the current row and just 3694 // return the cells of the nextRow when calling RegionScanner.nextRaw. So here we need to 3695 // check for row change. 3696 Result lastResult = results.get(results.size() - 1); 3697 if (lastResult.mayHaveMoreCellsInRow()) { 3698 rsh.rowOfLastPartialResult = lastResult.getRow(); 3699 } else { 3700 rsh.rowOfLastPartialResult = null; 3701 } 3702 } 3703 if (!builder.getMoreResults() || !builder.getMoreResultsInRegion() || closeScanner) { 3704 scannerClosed = true; 3705 closeScanner(region, scanner, scannerName, rpcCall, false); 3706 } 3707 3708 // There's no point returning to a timed out client. Throwing ensures scanner is closed 3709 if (rpcCall != null && EnvironmentEdgeManager.currentTime() > rpcCall.getDeadline()) { 3710 throw new TimeoutIOException("Client deadline exceeded, cannot return results"); 3711 } 3712 3713 return builder.build(); 3714 } catch (IOException e) { 3715 try { 3716 // scanner is closed here 3717 scannerClosed = true; 3718 // The scanner state might be left in a dirty state, so we will tell the Client to 3719 // fail this RPC and close the scanner while opening up another one from the start of 3720 // row that the client has last seen. 3721 closeScanner(region, scanner, scannerName, rpcCall, true); 3722 3723 // If it is a DoNotRetryIOException already, throw as it is. Unfortunately, DNRIOE is 3724 // used in two different semantics. 3725 // (1) The first is to close the client scanner and bubble up the exception all the way 3726 // to the application. This is preferred when the exception is really un-recoverable 3727 // (like CorruptHFileException, etc). Plain DoNotRetryIOException also falls into this 3728 // bucket usually. 3729 // (2) Second semantics is to close the current region scanner only, but continue the 3730 // client scanner by overriding the exception. This is usually UnknownScannerException, 3731 // OutOfOrderScannerNextException, etc where the region scanner has to be closed, but the 3732 // application-level ClientScanner has to continue without bubbling up the exception to 3733 // the client. See ClientScanner code to see how it deals with these special exceptions. 3734 if (e instanceof DoNotRetryIOException) { 3735 throw e; 3736 } 3737 3738 // If it is a FileNotFoundException, wrap as a 3739 // DoNotRetryIOException. This can avoid the retry in ClientScanner. 3740 if (e instanceof FileNotFoundException) { 3741 throw new DoNotRetryIOException(e); 3742 } 3743 3744 // We closed the scanner already. Instead of throwing the IOException, and client 3745 // retrying with the same scannerId only to get USE on the next RPC, we directly throw 3746 // a special exception to save an RPC. 3747 if (VersionInfoUtil.hasMinimumVersion(rpcCall.getClientVersionInfo(), 1, 4)) { 3748 // 1.4.0+ clients know how to handle 3749 throw new ScannerResetException("Scanner is closed on the server-side", e); 3750 } else { 3751 // older clients do not know about SRE. Just throw USE, which they will handle 3752 throw new UnknownScannerException("Throwing UnknownScannerException to reset the client" 3753 + " scanner state for clients older than 1.3.", e); 3754 } 3755 } catch (IOException ioe) { 3756 throw new ServiceException(ioe); 3757 } 3758 } finally { 3759 if (!scannerClosed) { 3760 // Adding resets expiration time on lease. 3761 // the closeCallBack will be set in closeScanner so here we only care about shippedCallback 3762 if (rpcCall != null) { 3763 rpcCall.setCallBack(rsh.shippedCallback); 3764 } else { 3765 // If context is null,here we call rsh.shippedCallback directly to reuse the logic in 3766 // rsh.shippedCallback to release the internal resources in rsh,and lease is also added 3767 // back to regionserver's LeaseManager in rsh.shippedCallback. 3768 runShippedCallback(rsh); 3769 } 3770 } 3771 quota.close(); 3772 } 3773 } 3774 3775 private void runShippedCallback(RegionScannerHolder rsh) throws ServiceException { 3776 assert rsh.shippedCallback != null; 3777 try { 3778 rsh.shippedCallback.run(); 3779 } catch (IOException ioe) { 3780 throw new ServiceException(ioe); 3781 } 3782 } 3783 3784 private void closeScanner(HRegion region, RegionScanner scanner, String scannerName, 3785 RpcCallContext context, boolean isError) throws IOException { 3786 if (region.getCoprocessorHost() != null) { 3787 if (region.getCoprocessorHost().preScannerClose(scanner)) { 3788 // bypass the actual close. 3789 return; 3790 } 3791 } 3792 RegionScannerHolder rsh = scanners.remove(scannerName); 3793 if (rsh != null) { 3794 if (context != null) { 3795 context.setCallBack(rsh.closeCallBack); 3796 } else { 3797 rsh.s.close(); 3798 } 3799 if (region.getCoprocessorHost() != null) { 3800 region.getCoprocessorHost().postScannerClose(scanner); 3801 } 3802 if (!isError) { 3803 closedScanners.put(scannerName, rsh.getNextCallSeq()); 3804 } 3805 } 3806 } 3807 3808 @Override 3809 public CoprocessorServiceResponse execRegionServerService(RpcController controller, 3810 CoprocessorServiceRequest request) throws ServiceException { 3811 rpcPreCheck("execRegionServerService"); 3812 return server.execRegionServerService(controller, request); 3813 } 3814 3815 @Override 3816 public GetSpaceQuotaSnapshotsResponse getSpaceQuotaSnapshots(RpcController controller, 3817 GetSpaceQuotaSnapshotsRequest request) throws ServiceException { 3818 try { 3819 final RegionServerSpaceQuotaManager manager = server.getRegionServerSpaceQuotaManager(); 3820 final GetSpaceQuotaSnapshotsResponse.Builder builder = 3821 GetSpaceQuotaSnapshotsResponse.newBuilder(); 3822 if (manager != null) { 3823 final Map<TableName, SpaceQuotaSnapshot> snapshots = manager.copyQuotaSnapshots(); 3824 for (Entry<TableName, SpaceQuotaSnapshot> snapshot : snapshots.entrySet()) { 3825 builder.addSnapshots(TableQuotaSnapshot.newBuilder() 3826 .setTableName(ProtobufUtil.toProtoTableName(snapshot.getKey())) 3827 .setSnapshot(SpaceQuotaSnapshot.toProtoSnapshot(snapshot.getValue())).build()); 3828 } 3829 } 3830 return builder.build(); 3831 } catch (Exception e) { 3832 throw new ServiceException(e); 3833 } 3834 } 3835 3836 @Override 3837 public ClearRegionBlockCacheResponse clearRegionBlockCache(RpcController controller, 3838 ClearRegionBlockCacheRequest request) throws ServiceException { 3839 try { 3840 rpcPreCheck("clearRegionBlockCache"); 3841 ClearRegionBlockCacheResponse.Builder builder = ClearRegionBlockCacheResponse.newBuilder(); 3842 CacheEvictionStatsBuilder stats = CacheEvictionStats.builder(); 3843 server.getRegionServerCoprocessorHost().preClearRegionBlockCache(); 3844 List<HRegion> regions = getRegions(request.getRegionList(), stats); 3845 for (HRegion region : regions) { 3846 try { 3847 stats = stats.append(this.server.clearRegionBlockCache(region)); 3848 } catch (Exception e) { 3849 stats.addException(region.getRegionInfo().getRegionName(), e); 3850 } 3851 } 3852 stats.withMaxCacheSize(server.getBlockCache().map(BlockCache::getMaxSize).orElse(0L)); 3853 server.getRegionServerCoprocessorHost().postClearRegionBlockCache(stats.build()); 3854 return builder.setStats(ProtobufUtil.toCacheEvictionStats(stats.build())).build(); 3855 } catch (IOException e) { 3856 throw new ServiceException(e); 3857 } 3858 } 3859 3860 private void executeOpenRegionProcedures(OpenRegionRequest request, 3861 Map<TableName, TableDescriptor> tdCache) { 3862 long masterSystemTime = request.hasMasterSystemTime() ? request.getMasterSystemTime() : -1; 3863 long initiatingMasterActiveTime = 3864 request.hasInitiatingMasterActiveTime() ? request.getInitiatingMasterActiveTime() : -1; 3865 for (RegionOpenInfo regionOpenInfo : request.getOpenInfoList()) { 3866 RegionInfo regionInfo = ProtobufUtil.toRegionInfo(regionOpenInfo.getRegion()); 3867 TableName tableName = regionInfo.getTable(); 3868 TableDescriptor tableDesc = tdCache.get(tableName); 3869 if (tableDesc == null) { 3870 try { 3871 tableDesc = server.getTableDescriptors().get(regionInfo.getTable()); 3872 } catch (IOException e) { 3873 // Here we do not fail the whole method since we also need deal with other 3874 // procedures, and we can not ignore this one, so we still schedule a 3875 // AssignRegionHandler and it will report back to master if we still can not get the 3876 // TableDescriptor. 3877 LOG.warn("Failed to get TableDescriptor of {}, will try again in the handler", 3878 regionInfo.getTable(), e); 3879 } 3880 if (tableDesc != null) { 3881 tdCache.put(tableName, tableDesc); 3882 } 3883 } 3884 if (regionOpenInfo.getFavoredNodesCount() > 0) { 3885 server.updateRegionFavoredNodesMapping(regionInfo.getEncodedName(), 3886 regionOpenInfo.getFavoredNodesList()); 3887 } 3888 long procId = regionOpenInfo.getOpenProcId(); 3889 if (server.submitRegionProcedure(procId)) { 3890 server.getExecutorService().submit(AssignRegionHandler.create(server, regionInfo, procId, 3891 tableDesc, masterSystemTime, initiatingMasterActiveTime)); 3892 } 3893 } 3894 } 3895 3896 private void executeCloseRegionProcedures(CloseRegionRequest request) { 3897 String encodedName; 3898 long initiatingMasterActiveTime = 3899 request.hasInitiatingMasterActiveTime() ? request.getInitiatingMasterActiveTime() : -1; 3900 try { 3901 encodedName = ProtobufUtil.getRegionEncodedName(request.getRegion()); 3902 } catch (DoNotRetryIOException e) { 3903 throw new UncheckedIOException("Should not happen", e); 3904 } 3905 ServerName destination = request.hasDestinationServer() 3906 ? ProtobufUtil.toServerName(request.getDestinationServer()) 3907 : null; 3908 long procId = request.getCloseProcId(); 3909 boolean evictCache = request.getEvictCache(); 3910 if (server.submitRegionProcedure(procId)) { 3911 server.getExecutorService().submit(UnassignRegionHandler.create(server, encodedName, procId, 3912 false, destination, evictCache, initiatingMasterActiveTime)); 3913 } 3914 } 3915 3916 private void executeProcedures(RemoteProcedureRequest request) { 3917 RSProcedureCallable callable; 3918 try { 3919 callable = Class.forName(request.getProcClass()).asSubclass(RSProcedureCallable.class) 3920 .getDeclaredConstructor().newInstance(); 3921 } catch (Exception e) { 3922 LOG.warn("Failed to instantiating remote procedure {}, pid={}", request.getProcClass(), 3923 request.getProcId(), e); 3924 server.remoteProcedureComplete(request.getProcId(), request.getInitiatingMasterActiveTime(), 3925 e); 3926 return; 3927 } 3928 callable.init(request.getProcData().toByteArray(), server); 3929 LOG.debug("Executing remote procedure {}, pid={}", callable.getClass(), request.getProcId()); 3930 server.executeProcedure(request.getProcId(), request.getInitiatingMasterActiveTime(), callable); 3931 } 3932 3933 @Override 3934 @QosPriority(priority = HConstants.ADMIN_QOS) 3935 public ExecuteProceduresResponse executeProcedures(RpcController controller, 3936 ExecuteProceduresRequest request) throws ServiceException { 3937 try { 3938 checkOpen(); 3939 throwOnWrongStartCode(request); 3940 server.getRegionServerCoprocessorHost().preExecuteProcedures(); 3941 if (request.getOpenRegionCount() > 0) { 3942 // Avoid reading from the TableDescritor every time(usually it will read from the file 3943 // system) 3944 Map<TableName, TableDescriptor> tdCache = new HashMap<>(); 3945 request.getOpenRegionList().forEach(req -> executeOpenRegionProcedures(req, tdCache)); 3946 } 3947 if (request.getCloseRegionCount() > 0) { 3948 request.getCloseRegionList().forEach(this::executeCloseRegionProcedures); 3949 } 3950 if (request.getProcCount() > 0) { 3951 request.getProcList().forEach(this::executeProcedures); 3952 } 3953 server.getRegionServerCoprocessorHost().postExecuteProcedures(); 3954 return ExecuteProceduresResponse.getDefaultInstance(); 3955 } catch (IOException e) { 3956 throw new ServiceException(e); 3957 } 3958 } 3959 3960 @Override 3961 public GetAllBootstrapNodesResponse getAllBootstrapNodes(RpcController controller, 3962 GetAllBootstrapNodesRequest request) throws ServiceException { 3963 GetAllBootstrapNodesResponse.Builder builder = GetAllBootstrapNodesResponse.newBuilder(); 3964 server.getBootstrapNodes() 3965 .forEachRemaining(server -> builder.addNode(ProtobufUtil.toServerName(server))); 3966 return builder.build(); 3967 } 3968 3969 private void setReloadableGuardrails(Configuration conf) { 3970 rowSizeWarnThreshold = 3971 conf.getInt(HConstants.BATCH_ROWS_THRESHOLD_NAME, HConstants.BATCH_ROWS_THRESHOLD_DEFAULT); 3972 rejectRowsWithSizeOverThreshold = 3973 conf.getBoolean(REJECT_BATCH_ROWS_OVER_THRESHOLD, DEFAULT_REJECT_BATCH_ROWS_OVER_THRESHOLD); 3974 maxScannerResultSize = conf.getLong(HConstants.HBASE_SERVER_SCANNER_MAX_RESULT_SIZE_KEY, 3975 HConstants.DEFAULT_HBASE_SERVER_SCANNER_MAX_RESULT_SIZE); 3976 } 3977 3978 @Override 3979 public void onConfigurationChange(Configuration conf) { 3980 super.onConfigurationChange(conf); 3981 setReloadableGuardrails(conf); 3982 } 3983 3984 @Override 3985 public GetCachedFilesListResponse getCachedFilesList(RpcController controller, 3986 GetCachedFilesListRequest request) throws ServiceException { 3987 GetCachedFilesListResponse.Builder responseBuilder = GetCachedFilesListResponse.newBuilder(); 3988 List<String> fullyCachedFiles = new ArrayList<>(); 3989 server.getBlockCache().flatMap(BlockCache::getFullyCachedFiles).ifPresent(fcf -> { 3990 fullyCachedFiles.addAll(fcf.keySet()); 3991 }); 3992 return responseBuilder.addAllCachedFiles(fullyCachedFiles).build(); 3993 } 3994 3995 RegionScannerContext checkQuotaAndGetRegionScannerContext(ScanRequest request, 3996 ScanResponse.Builder builder) throws IOException { 3997 if (request.hasScannerId()) { 3998 // The downstream projects such as AsyncHBase in OpenTSDB need this value. See HBASE-18000 3999 // for more details. 4000 long scannerId = request.getScannerId(); 4001 builder.setScannerId(scannerId); 4002 String scannerName = toScannerName(scannerId); 4003 RegionScannerHolder rsh = getRegionScanner(request); 4004 OperationQuota quota = 4005 getRpcQuotaManager().checkScanQuota(rsh.r, request, maxScannerResultSize, 4006 rsh.getMaxBlockBytesScanned(), rsh.getPrevBlockBytesScannedDifference()); 4007 return new RegionScannerContext(scannerName, rsh, quota); 4008 } 4009 4010 HRegion region = getRegion(request.getRegion()); 4011 OperationQuota quota = 4012 getRpcQuotaManager().checkScanQuota(region, request, maxScannerResultSize, 0L, 0L); 4013 Pair<String, RegionScannerHolder> pair = newRegionScanner(request, region, builder); 4014 return new RegionScannerContext(pair.getFirst(), pair.getSecond(), quota); 4015 } 4016}