001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019 020import com.google.errorprone.annotations.RestrictedApi; 021import java.io.FileNotFoundException; 022import java.io.IOException; 023import java.io.UncheckedIOException; 024import java.net.BindException; 025import java.net.InetAddress; 026import java.net.InetSocketAddress; 027import java.util.ArrayList; 028import java.util.Arrays; 029import java.util.Collections; 030import java.util.HashMap; 031import java.util.Iterator; 032import java.util.List; 033import java.util.Map; 034import java.util.Map.Entry; 035import java.util.NavigableMap; 036import java.util.Set; 037import java.util.TreeSet; 038import java.util.concurrent.ConcurrentHashMap; 039import java.util.concurrent.ConcurrentMap; 040import java.util.concurrent.TimeUnit; 041import java.util.concurrent.atomic.AtomicBoolean; 042import java.util.concurrent.atomic.AtomicLong; 043import java.util.concurrent.atomic.LongAdder; 044import org.apache.hadoop.conf.Configuration; 045import org.apache.hadoop.fs.FileSystem; 046import org.apache.hadoop.fs.Path; 047import org.apache.hadoop.hbase.CacheEvictionStats; 048import org.apache.hadoop.hbase.CacheEvictionStatsBuilder; 049import org.apache.hadoop.hbase.Cell; 050import org.apache.hadoop.hbase.CellScannable; 051import org.apache.hadoop.hbase.CellScanner; 052import org.apache.hadoop.hbase.CellUtil; 053import org.apache.hadoop.hbase.DoNotRetryIOException; 054import org.apache.hadoop.hbase.DroppedSnapshotException; 055import org.apache.hadoop.hbase.HBaseIOException; 056import org.apache.hadoop.hbase.HBaseRpcServicesBase; 057import org.apache.hadoop.hbase.HConstants; 058import org.apache.hadoop.hbase.MultiActionResultTooLarge; 059import org.apache.hadoop.hbase.NotServingRegionException; 060import org.apache.hadoop.hbase.PrivateCellUtil; 061import org.apache.hadoop.hbase.RegionTooBusyException; 062import org.apache.hadoop.hbase.Server; 063import org.apache.hadoop.hbase.ServerName; 064import org.apache.hadoop.hbase.TableName; 065import org.apache.hadoop.hbase.UnknownScannerException; 066import org.apache.hadoop.hbase.client.Append; 067import org.apache.hadoop.hbase.client.CheckAndMutate; 068import org.apache.hadoop.hbase.client.CheckAndMutateResult; 069import org.apache.hadoop.hbase.client.Delete; 070import org.apache.hadoop.hbase.client.Durability; 071import org.apache.hadoop.hbase.client.Get; 072import org.apache.hadoop.hbase.client.Increment; 073import org.apache.hadoop.hbase.client.Mutation; 074import org.apache.hadoop.hbase.client.OperationWithAttributes; 075import org.apache.hadoop.hbase.client.Put; 076import org.apache.hadoop.hbase.client.RegionInfo; 077import org.apache.hadoop.hbase.client.RegionReplicaUtil; 078import org.apache.hadoop.hbase.client.Result; 079import org.apache.hadoop.hbase.client.Row; 080import org.apache.hadoop.hbase.client.Scan; 081import org.apache.hadoop.hbase.client.TableDescriptor; 082import org.apache.hadoop.hbase.client.VersionInfoUtil; 083import org.apache.hadoop.hbase.exceptions.FailedSanityCheckException; 084import org.apache.hadoop.hbase.exceptions.OutOfOrderScannerNextException; 085import org.apache.hadoop.hbase.exceptions.ScannerResetException; 086import org.apache.hadoop.hbase.exceptions.TimeoutIOException; 087import org.apache.hadoop.hbase.exceptions.UnknownProtocolException; 088import org.apache.hadoop.hbase.io.ByteBuffAllocator; 089import org.apache.hadoop.hbase.io.hfile.BlockCache; 090import org.apache.hadoop.hbase.ipc.HBaseRpcController; 091import org.apache.hadoop.hbase.ipc.PriorityFunction; 092import org.apache.hadoop.hbase.ipc.QosPriority; 093import org.apache.hadoop.hbase.ipc.RpcCall; 094import org.apache.hadoop.hbase.ipc.RpcCallContext; 095import org.apache.hadoop.hbase.ipc.RpcCallback; 096import org.apache.hadoop.hbase.ipc.RpcServer; 097import org.apache.hadoop.hbase.ipc.RpcServer.BlockingServiceAndInterface; 098import org.apache.hadoop.hbase.ipc.RpcServerFactory; 099import org.apache.hadoop.hbase.ipc.RpcServerInterface; 100import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException; 101import org.apache.hadoop.hbase.ipc.ServerRpcController; 102import org.apache.hadoop.hbase.net.Address; 103import org.apache.hadoop.hbase.procedure2.RSProcedureCallable; 104import org.apache.hadoop.hbase.quotas.ActivePolicyEnforcement; 105import org.apache.hadoop.hbase.quotas.OperationQuota; 106import org.apache.hadoop.hbase.quotas.QuotaUtil; 107import org.apache.hadoop.hbase.quotas.RegionServerRpcQuotaManager; 108import org.apache.hadoop.hbase.quotas.RegionServerSpaceQuotaManager; 109import org.apache.hadoop.hbase.quotas.SpaceQuotaSnapshot; 110import org.apache.hadoop.hbase.quotas.SpaceViolationPolicyEnforcement; 111import org.apache.hadoop.hbase.regionserver.LeaseManager.Lease; 112import org.apache.hadoop.hbase.regionserver.LeaseManager.LeaseStillHeldException; 113import org.apache.hadoop.hbase.regionserver.Region.Operation; 114import org.apache.hadoop.hbase.regionserver.ScannerContext.LimitScope; 115import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker; 116import org.apache.hadoop.hbase.regionserver.handler.AssignRegionHandler; 117import org.apache.hadoop.hbase.regionserver.handler.OpenMetaHandler; 118import org.apache.hadoop.hbase.regionserver.handler.OpenPriorityRegionHandler; 119import org.apache.hadoop.hbase.regionserver.handler.OpenRegionHandler; 120import org.apache.hadoop.hbase.regionserver.handler.UnassignRegionHandler; 121import org.apache.hadoop.hbase.replication.ReplicationUtils; 122import org.apache.hadoop.hbase.replication.regionserver.RejectReplicationRequestStateChecker; 123import org.apache.hadoop.hbase.replication.regionserver.RejectRequestsFromClientStateChecker; 124import org.apache.hadoop.hbase.security.Superusers; 125import org.apache.hadoop.hbase.security.access.Permission; 126import org.apache.hadoop.hbase.util.Bytes; 127import org.apache.hadoop.hbase.util.DNS; 128import org.apache.hadoop.hbase.util.DNS.ServerType; 129import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 130import org.apache.hadoop.hbase.util.Pair; 131import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil; 132import org.apache.hadoop.hbase.wal.WAL; 133import org.apache.hadoop.hbase.wal.WALEdit; 134import org.apache.hadoop.hbase.wal.WALKey; 135import org.apache.hadoop.hbase.wal.WALSplitUtil; 136import org.apache.hadoop.hbase.wal.WALSplitUtil.MutationReplay; 137import org.apache.hadoop.hbase.zookeeper.ZKWatcher; 138import org.apache.yetus.audience.InterfaceAudience; 139import org.slf4j.Logger; 140import org.slf4j.LoggerFactory; 141 142import org.apache.hbase.thirdparty.com.google.common.cache.Cache; 143import org.apache.hbase.thirdparty.com.google.common.cache.CacheBuilder; 144import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList; 145import org.apache.hbase.thirdparty.com.google.common.collect.Lists; 146import org.apache.hbase.thirdparty.com.google.protobuf.ByteString; 147import org.apache.hbase.thirdparty.com.google.protobuf.Message; 148import org.apache.hbase.thirdparty.com.google.protobuf.RpcController; 149import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException; 150import org.apache.hbase.thirdparty.com.google.protobuf.TextFormat; 151import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations; 152import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils; 153 154import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 155import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter; 156import org.apache.hadoop.hbase.shaded.protobuf.ResponseConverter; 157import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService; 158import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearCompactionQueuesRequest; 159import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearCompactionQueuesResponse; 160import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearRegionBlockCacheRequest; 161import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearRegionBlockCacheResponse; 162import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CloseRegionRequest; 163import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CloseRegionResponse; 164import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactRegionRequest; 165import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactRegionResponse; 166import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactionSwitchRequest; 167import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactionSwitchResponse; 168import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ExecuteProceduresRequest; 169import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ExecuteProceduresResponse; 170import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionRequest; 171import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionResponse; 172import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetCachedFilesListRequest; 173import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetCachedFilesListResponse; 174import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetOnlineRegionRequest; 175import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetOnlineRegionResponse; 176import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoRequest; 177import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoResponse; 178import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionLoadRequest; 179import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionLoadResponse; 180import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetServerInfoRequest; 181import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetServerInfoResponse; 182import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetStoreFileRequest; 183import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetStoreFileResponse; 184import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionRequest; 185import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionRequest.RegionOpenInfo; 186import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionResponse; 187import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionResponse.RegionOpeningState; 188import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.RemoteProcedureRequest; 189import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ReplicateWALEntryRequest; 190import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ReplicateWALEntryResponse; 191import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.RollWALWriterRequest; 192import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.RollWALWriterResponse; 193import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.StopServerRequest; 194import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.StopServerResponse; 195import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateFavoredNodesRequest; 196import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateFavoredNodesResponse; 197import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WALEntry; 198import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WarmupRegionRequest; 199import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WarmupRegionResponse; 200import org.apache.hadoop.hbase.shaded.protobuf.generated.BootstrapNodeProtos.BootstrapNodeService; 201import org.apache.hadoop.hbase.shaded.protobuf.generated.BootstrapNodeProtos.GetAllBootstrapNodesRequest; 202import org.apache.hadoop.hbase.shaded.protobuf.generated.BootstrapNodeProtos.GetAllBootstrapNodesResponse; 203import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos; 204import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.Action; 205import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.BulkLoadHFileRequest; 206import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.BulkLoadHFileRequest.FamilyPath; 207import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.BulkLoadHFileResponse; 208import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CleanupBulkLoadRequest; 209import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CleanupBulkLoadResponse; 210import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService; 211import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.Condition; 212import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceRequest; 213import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceResponse; 214import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.GetRequest; 215import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.GetResponse; 216import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MultiRegionLoadStats; 217import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MultiRequest; 218import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MultiResponse; 219import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateRequest; 220import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateResponse; 221import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutationProto; 222import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutationProto.MutationType; 223import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.PrepareBulkLoadRequest; 224import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.PrepareBulkLoadResponse; 225import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.RegionAction; 226import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.RegionActionResult; 227import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ResultOrException; 228import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanRequest; 229import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanResponse; 230import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos; 231import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos.RegionLoad; 232import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.NameBytesPair; 233import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.NameInt64Pair; 234import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier; 235import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType; 236import org.apache.hadoop.hbase.shaded.protobuf.generated.MapReduceProtos.ScanMetrics; 237import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsRequest; 238import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsResponse; 239import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsResponse.TableQuotaSnapshot; 240import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.ClientMetaService; 241import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.BulkLoadDescriptor; 242import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.CompactionDescriptor; 243import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.FlushDescriptor; 244import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.RegionEventDescriptor; 245 246/** 247 * Implements the regionserver RPC services. 248 */ 249@InterfaceAudience.Private 250public class RSRpcServices extends HBaseRpcServicesBase<HRegionServer> 251 implements ClientService.BlockingInterface, BootstrapNodeService.BlockingInterface { 252 253 private static final Logger LOG = LoggerFactory.getLogger(RSRpcServices.class); 254 255 /** RPC scheduler to use for the region server. */ 256 public static final String REGION_SERVER_RPC_SCHEDULER_FACTORY_CLASS = 257 "hbase.region.server.rpc.scheduler.factory.class"; 258 259 /** 260 * Minimum allowable time limit delta (in milliseconds) that can be enforced during scans. This 261 * configuration exists to prevent the scenario where a time limit is specified to be so 262 * restrictive that the time limit is reached immediately (before any cells are scanned). 263 */ 264 private static final String REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA = 265 "hbase.region.server.rpc.minimum.scan.time.limit.delta"; 266 /** 267 * Default value of {@link RSRpcServices#REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA} 268 */ 269 static final long DEFAULT_REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA = 10; 270 271 /** 272 * Whether to reject rows with size > threshold defined by 273 * {@link HConstants#BATCH_ROWS_THRESHOLD_NAME} 274 */ 275 private static final String REJECT_BATCH_ROWS_OVER_THRESHOLD = 276 "hbase.rpc.rows.size.threshold.reject"; 277 278 /** 279 * Default value of config {@link RSRpcServices#REJECT_BATCH_ROWS_OVER_THRESHOLD} 280 */ 281 private static final boolean DEFAULT_REJECT_BATCH_ROWS_OVER_THRESHOLD = false; 282 283 // Request counter. (Includes requests that are not serviced by regions.) 284 // Count only once for requests with multiple actions like multi/caching-scan/replayBatch 285 final LongAdder requestCount = new LongAdder(); 286 287 // Request counter for rpc get 288 final LongAdder rpcGetRequestCount = new LongAdder(); 289 290 // Request counter for rpc scan 291 final LongAdder rpcScanRequestCount = new LongAdder(); 292 293 // Request counter for scans that might end up in full scans 294 final LongAdder rpcFullScanRequestCount = new LongAdder(); 295 296 // Request counter for rpc multi 297 final LongAdder rpcMultiRequestCount = new LongAdder(); 298 299 // Request counter for rpc mutate 300 final LongAdder rpcMutateRequestCount = new LongAdder(); 301 302 private volatile long maxScannerResultSize; 303 304 private ScannerIdGenerator scannerIdGenerator; 305 private final ConcurrentMap<String, RegionScannerHolder> scanners = new ConcurrentHashMap<>(); 306 // Hold the name and last sequence number of a closed scanner for a while. This is used 307 // to keep compatible for old clients which may send next or close request to a region 308 // scanner which has already been exhausted. The entries will be removed automatically 309 // after scannerLeaseTimeoutPeriod. 310 private final Cache<String, Long> closedScanners; 311 /** 312 * The lease timeout period for client scanners (milliseconds). 313 */ 314 private final int scannerLeaseTimeoutPeriod; 315 316 /** 317 * The RPC timeout period (milliseconds) 318 */ 319 private final int rpcTimeout; 320 321 /** 322 * The minimum allowable delta to use for the scan limit 323 */ 324 private final long minimumScanTimeLimitDelta; 325 326 /** 327 * Row size threshold for multi requests above which a warning is logged 328 */ 329 private volatile int rowSizeWarnThreshold; 330 /* 331 * Whether we should reject requests with very high no of rows i.e. beyond threshold defined by 332 * rowSizeWarnThreshold 333 */ 334 private volatile boolean rejectRowsWithSizeOverThreshold; 335 336 final AtomicBoolean clearCompactionQueues = new AtomicBoolean(false); 337 338 /** 339 * Services launched in RSRpcServices. By default they are on but you can use the below booleans 340 * to selectively enable/disable these services (Rare is the case where you would ever turn off 341 * one or the other). 342 */ 343 public static final String REGIONSERVER_ADMIN_SERVICE_CONFIG = 344 "hbase.regionserver.admin.executorService"; 345 public static final String REGIONSERVER_CLIENT_SERVICE_CONFIG = 346 "hbase.regionserver.client.executorService"; 347 public static final String REGIONSERVER_CLIENT_META_SERVICE_CONFIG = 348 "hbase.regionserver.client.meta.executorService"; 349 public static final String REGIONSERVER_BOOTSTRAP_NODES_SERVICE_CONFIG = 350 "hbase.regionserver.bootstrap.nodes.executorService"; 351 352 /** 353 * An Rpc callback for closing a RegionScanner. 354 */ 355 private static final class RegionScannerCloseCallBack implements RpcCallback { 356 357 private final RegionScanner scanner; 358 359 public RegionScannerCloseCallBack(RegionScanner scanner) { 360 this.scanner = scanner; 361 } 362 363 @Override 364 public void run() throws IOException { 365 this.scanner.close(); 366 } 367 } 368 369 /** 370 * An Rpc callback for doing shipped() call on a RegionScanner. 371 */ 372 private class RegionScannerShippedCallBack implements RpcCallback { 373 private final String scannerName; 374 private final Shipper shipper; 375 private final Lease lease; 376 377 public RegionScannerShippedCallBack(String scannerName, Shipper shipper, Lease lease) { 378 this.scannerName = scannerName; 379 this.shipper = shipper; 380 this.lease = lease; 381 } 382 383 @Override 384 public void run() throws IOException { 385 this.shipper.shipped(); 386 // We're done. On way out re-add the above removed lease. The lease was temp removed for this 387 // Rpc call and we are at end of the call now. Time to add it back. 388 if (scanners.containsKey(scannerName)) { 389 if (lease != null) { 390 server.getLeaseManager().addLease(lease); 391 } 392 } 393 } 394 } 395 396 /** 397 * An RpcCallBack that creates a list of scanners that needs to perform callBack operation on 398 * completion of multiGets. 399 */ 400 static class RegionScannersCloseCallBack implements RpcCallback { 401 private final List<RegionScanner> scanners = new ArrayList<>(); 402 403 public void addScanner(RegionScanner scanner) { 404 this.scanners.add(scanner); 405 } 406 407 @Override 408 public void run() { 409 for (RegionScanner scanner : scanners) { 410 try { 411 scanner.close(); 412 } catch (IOException e) { 413 LOG.error("Exception while closing the scanner " + scanner, e); 414 } 415 } 416 } 417 } 418 419 /** 420 * Holder class which holds the RegionScanner, nextCallSeq and RpcCallbacks together. 421 */ 422 static final class RegionScannerHolder { 423 private final AtomicLong nextCallSeq = new AtomicLong(0); 424 private final RegionScanner s; 425 private final HRegion r; 426 private final RpcCallback closeCallBack; 427 private final RpcCallback shippedCallback; 428 private byte[] rowOfLastPartialResult; 429 private boolean needCursor; 430 private boolean fullRegionScan; 431 private final String clientIPAndPort; 432 private final String userName; 433 private volatile long maxBlockBytesScanned = 0; 434 private volatile long prevBlockBytesScanned = 0; 435 private volatile long prevBlockBytesScannedDifference = 0; 436 437 RegionScannerHolder(RegionScanner s, HRegion r, RpcCallback closeCallBack, 438 RpcCallback shippedCallback, boolean needCursor, boolean fullRegionScan, 439 String clientIPAndPort, String userName) { 440 this.s = s; 441 this.r = r; 442 this.closeCallBack = closeCallBack; 443 this.shippedCallback = shippedCallback; 444 this.needCursor = needCursor; 445 this.fullRegionScan = fullRegionScan; 446 this.clientIPAndPort = clientIPAndPort; 447 this.userName = userName; 448 } 449 450 long getNextCallSeq() { 451 return nextCallSeq.get(); 452 } 453 454 boolean incNextCallSeq(long currentSeq) { 455 // Use CAS to prevent multiple scan request running on the same scanner. 456 return nextCallSeq.compareAndSet(currentSeq, currentSeq + 1); 457 } 458 459 long getMaxBlockBytesScanned() { 460 return maxBlockBytesScanned; 461 } 462 463 long getPrevBlockBytesScannedDifference() { 464 return prevBlockBytesScannedDifference; 465 } 466 467 void updateBlockBytesScanned(long blockBytesScanned) { 468 prevBlockBytesScannedDifference = blockBytesScanned - prevBlockBytesScanned; 469 prevBlockBytesScanned = blockBytesScanned; 470 if (blockBytesScanned > maxBlockBytesScanned) { 471 maxBlockBytesScanned = blockBytesScanned; 472 } 473 } 474 475 // Should be called only when we need to print lease expired messages otherwise 476 // cache the String once made. 477 @Override 478 public String toString() { 479 return "clientIPAndPort=" + this.clientIPAndPort + ", userName=" + this.userName 480 + ", regionInfo=" + this.r.getRegionInfo().getRegionNameAsString(); 481 } 482 } 483 484 /** 485 * Instantiated as a scanner lease. If the lease times out, the scanner is closed 486 */ 487 private class ScannerListener implements LeaseListener { 488 private final String scannerName; 489 490 ScannerListener(final String n) { 491 this.scannerName = n; 492 } 493 494 @Override 495 public void leaseExpired() { 496 RegionScannerHolder rsh = scanners.remove(this.scannerName); 497 if (rsh == null) { 498 LOG.warn("Scanner lease {} expired but no outstanding scanner", this.scannerName); 499 return; 500 } 501 LOG.info("Scanner lease {} expired {}", this.scannerName, rsh); 502 server.getMetrics().incrScannerLeaseExpired(); 503 RegionScanner s = rsh.s; 504 HRegion region = null; 505 try { 506 region = server.getRegion(s.getRegionInfo().getRegionName()); 507 if (region != null && region.getCoprocessorHost() != null) { 508 region.getCoprocessorHost().preScannerClose(s); 509 } 510 } catch (IOException e) { 511 LOG.error("Closing scanner {} {}", this.scannerName, rsh, e); 512 } finally { 513 try { 514 s.close(); 515 if (region != null && region.getCoprocessorHost() != null) { 516 region.getCoprocessorHost().postScannerClose(s); 517 } 518 } catch (IOException e) { 519 LOG.error("Closing scanner {} {}", this.scannerName, rsh, e); 520 } 521 } 522 } 523 } 524 525 private static ResultOrException getResultOrException(final ClientProtos.Result r, 526 final int index) { 527 return getResultOrException(ResponseConverter.buildActionResult(r), index); 528 } 529 530 private static ResultOrException getResultOrException(final Exception e, final int index) { 531 return getResultOrException(ResponseConverter.buildActionResult(e), index); 532 } 533 534 private static ResultOrException getResultOrException(final ResultOrException.Builder builder, 535 final int index) { 536 return builder.setIndex(index).build(); 537 } 538 539 /** 540 * Checks for the following pre-checks in order: 541 * <ol> 542 * <li>RegionServer is running</li> 543 * <li>If authorization is enabled, then RPC caller has ADMIN permissions</li> 544 * </ol> 545 * @param requestName name of rpc request. Used in reporting failures to provide context. 546 * @throws ServiceException If any of the above listed pre-check fails. 547 */ 548 private void rpcPreCheck(String requestName) throws ServiceException { 549 try { 550 checkOpen(); 551 requirePermission(requestName, Permission.Action.ADMIN); 552 } catch (IOException ioe) { 553 throw new ServiceException(ioe); 554 } 555 } 556 557 private boolean isClientCellBlockSupport(RpcCallContext context) { 558 return context != null && context.isClientCellBlockSupported(); 559 } 560 561 private void addResult(final MutateResponse.Builder builder, final Result result, 562 final HBaseRpcController rpcc, boolean clientCellBlockSupported) { 563 if (result == null) return; 564 if (clientCellBlockSupported) { 565 builder.setResult(ProtobufUtil.toResultNoData(result)); 566 rpcc.setCellScanner(result.cellScanner()); 567 } else { 568 ClientProtos.Result pbr = ProtobufUtil.toResult(result); 569 builder.setResult(pbr); 570 } 571 } 572 573 private void addResults(ScanResponse.Builder builder, List<Result> results, 574 HBaseRpcController controller, boolean isDefaultRegion, boolean clientCellBlockSupported) { 575 builder.setStale(!isDefaultRegion); 576 if (results.isEmpty()) { 577 return; 578 } 579 if (clientCellBlockSupported) { 580 for (Result res : results) { 581 builder.addCellsPerResult(res.size()); 582 builder.addPartialFlagPerResult(res.mayHaveMoreCellsInRow()); 583 } 584 controller.setCellScanner(CellUtil.createCellScanner(results)); 585 } else { 586 for (Result res : results) { 587 ClientProtos.Result pbr = ProtobufUtil.toResult(res); 588 builder.addResults(pbr); 589 } 590 } 591 } 592 593 private CheckAndMutateResult checkAndMutate(HRegion region, List<ClientProtos.Action> actions, 594 CellScanner cellScanner, Condition condition, long nonceGroup, 595 ActivePolicyEnforcement spaceQuotaEnforcement) throws IOException { 596 int countOfCompleteMutation = 0; 597 try { 598 if (!region.getRegionInfo().isMetaRegion()) { 599 server.getMemStoreFlusher().reclaimMemStoreMemory(); 600 } 601 List<Mutation> mutations = new ArrayList<>(); 602 long nonce = HConstants.NO_NONCE; 603 for (ClientProtos.Action action : actions) { 604 if (action.hasGet()) { 605 throw new DoNotRetryIOException( 606 "Atomic put and/or delete only, not a Get=" + action.getGet()); 607 } 608 MutationProto mutation = action.getMutation(); 609 MutationType type = mutation.getMutateType(); 610 switch (type) { 611 case PUT: 612 Put put = ProtobufUtil.toPut(mutation, cellScanner); 613 ++countOfCompleteMutation; 614 checkCellSizeLimit(region, put); 615 spaceQuotaEnforcement.getPolicyEnforcement(region).check(put); 616 mutations.add(put); 617 break; 618 case DELETE: 619 Delete del = ProtobufUtil.toDelete(mutation, cellScanner); 620 ++countOfCompleteMutation; 621 spaceQuotaEnforcement.getPolicyEnforcement(region).check(del); 622 mutations.add(del); 623 break; 624 case INCREMENT: 625 Increment increment = ProtobufUtil.toIncrement(mutation, cellScanner); 626 nonce = mutation.hasNonce() ? mutation.getNonce() : HConstants.NO_NONCE; 627 ++countOfCompleteMutation; 628 checkCellSizeLimit(region, increment); 629 spaceQuotaEnforcement.getPolicyEnforcement(region).check(increment); 630 mutations.add(increment); 631 break; 632 case APPEND: 633 Append append = ProtobufUtil.toAppend(mutation, cellScanner); 634 nonce = mutation.hasNonce() ? mutation.getNonce() : HConstants.NO_NONCE; 635 ++countOfCompleteMutation; 636 checkCellSizeLimit(region, append); 637 spaceQuotaEnforcement.getPolicyEnforcement(region).check(append); 638 mutations.add(append); 639 break; 640 default: 641 throw new DoNotRetryIOException("invalid mutation type : " + type); 642 } 643 } 644 645 if (mutations.size() == 0) { 646 return new CheckAndMutateResult(true, null); 647 } else { 648 CheckAndMutate checkAndMutate = ProtobufUtil.toCheckAndMutate(condition, mutations); 649 CheckAndMutateResult result = null; 650 if (region.getCoprocessorHost() != null) { 651 result = region.getCoprocessorHost().preCheckAndMutate(checkAndMutate); 652 } 653 if (result == null) { 654 result = region.checkAndMutate(checkAndMutate, nonceGroup, nonce); 655 if (region.getCoprocessorHost() != null) { 656 result = region.getCoprocessorHost().postCheckAndMutate(checkAndMutate, result); 657 } 658 } 659 return result; 660 } 661 } finally { 662 // Currently, the checkAndMutate isn't supported by batch so it won't mess up the cell scanner 663 // even if the malformed cells are not skipped. 664 for (int i = countOfCompleteMutation; i < actions.size(); ++i) { 665 skipCellsForMutation(actions.get(i), cellScanner); 666 } 667 } 668 } 669 670 /** 671 * Execute an append mutation. 672 * @return result to return to client if default operation should be bypassed as indicated by 673 * RegionObserver, null otherwise 674 */ 675 private Result append(final HRegion region, final OperationQuota quota, 676 final MutationProto mutation, final CellScanner cellScanner, long nonceGroup, 677 ActivePolicyEnforcement spaceQuota, RpcCallContext context) throws IOException { 678 long before = EnvironmentEdgeManager.currentTime(); 679 Append append = ProtobufUtil.toAppend(mutation, cellScanner); 680 checkCellSizeLimit(region, append); 681 spaceQuota.getPolicyEnforcement(region).check(append); 682 quota.addMutation(append); 683 long blockBytesScannedBefore = context != null ? context.getBlockBytesScanned() : 0; 684 long nonce = mutation.hasNonce() ? mutation.getNonce() : HConstants.NO_NONCE; 685 Result r = region.append(append, nonceGroup, nonce); 686 if (server.getMetrics() != null) { 687 long blockBytesScanned = 688 context != null ? context.getBlockBytesScanned() - blockBytesScannedBefore : 0; 689 server.getMetrics().updateAppend(region, EnvironmentEdgeManager.currentTime() - before, 690 blockBytesScanned); 691 } 692 return r == null ? Result.EMPTY_RESULT : r; 693 } 694 695 /** 696 * Execute an increment mutation. 697 */ 698 private Result increment(final HRegion region, final OperationQuota quota, 699 final MutationProto mutation, final CellScanner cells, long nonceGroup, 700 ActivePolicyEnforcement spaceQuota, RpcCallContext context) throws IOException { 701 long before = EnvironmentEdgeManager.currentTime(); 702 Increment increment = ProtobufUtil.toIncrement(mutation, cells); 703 checkCellSizeLimit(region, increment); 704 spaceQuota.getPolicyEnforcement(region).check(increment); 705 quota.addMutation(increment); 706 long blockBytesScannedBefore = context != null ? context.getBlockBytesScanned() : 0; 707 long nonce = mutation.hasNonce() ? mutation.getNonce() : HConstants.NO_NONCE; 708 Result r = region.increment(increment, nonceGroup, nonce); 709 final MetricsRegionServer metricsRegionServer = server.getMetrics(); 710 if (metricsRegionServer != null) { 711 long blockBytesScanned = 712 context != null ? context.getBlockBytesScanned() - blockBytesScannedBefore : 0; 713 metricsRegionServer.updateIncrement(region, EnvironmentEdgeManager.currentTime() - before, 714 blockBytesScanned); 715 } 716 return r == null ? Result.EMPTY_RESULT : r; 717 } 718 719 /** 720 * Run through the regionMutation <code>rm</code> and per Mutation, do the work, and then when 721 * done, add an instance of a {@link ResultOrException} that corresponds to each Mutation. 722 * @param cellsToReturn Could be null. May be allocated in this method. This is what this method 723 * returns as a 'result'. 724 * @param closeCallBack the callback to be used with multigets 725 * @param context the current RpcCallContext 726 * @return Return the <code>cellScanner</code> passed 727 */ 728 private List<CellScannable> doNonAtomicRegionMutation(final HRegion region, 729 final OperationQuota quota, final RegionAction actions, final CellScanner cellScanner, 730 final RegionActionResult.Builder builder, List<CellScannable> cellsToReturn, long nonceGroup, 731 final RegionScannersCloseCallBack closeCallBack, RpcCallContext context, 732 ActivePolicyEnforcement spaceQuotaEnforcement) { 733 // Gather up CONTIGUOUS Puts and Deletes in this mutations List. Idea is that rather than do 734 // one at a time, we instead pass them in batch. Be aware that the corresponding 735 // ResultOrException instance that matches each Put or Delete is then added down in the 736 // doNonAtomicBatchOp call. We should be staying aligned though the Put and Delete are 737 // deferred/batched 738 List<ClientProtos.Action> mutations = null; 739 long maxQuotaResultSize = Math.min(maxScannerResultSize, quota.getMaxResultSize()); 740 IOException sizeIOE = null; 741 ClientProtos.ResultOrException.Builder resultOrExceptionBuilder = 742 ResultOrException.newBuilder(); 743 boolean hasResultOrException = false; 744 for (ClientProtos.Action action : actions.getActionList()) { 745 hasResultOrException = false; 746 resultOrExceptionBuilder.clear(); 747 try { 748 Result r = null; 749 long blockBytesScannedBefore = context != null ? context.getBlockBytesScanned() : 0; 750 if ( 751 context != null && context.isRetryImmediatelySupported() 752 && (context.getResponseCellSize() > maxQuotaResultSize 753 || blockBytesScannedBefore + context.getResponseExceptionSize() > maxQuotaResultSize) 754 ) { 755 756 // We're storing the exception since the exception and reason string won't 757 // change after the response size limit is reached. 758 if (sizeIOE == null) { 759 // We don't need the stack un-winding do don't throw the exception. 760 // Throwing will kill the JVM's JIT. 761 // 762 // Instead just create the exception and then store it. 763 sizeIOE = new MultiActionResultTooLarge("Max size exceeded" + " CellSize: " 764 + context.getResponseCellSize() + " BlockSize: " + blockBytesScannedBefore); 765 766 // Only report the exception once since there's only one request that 767 // caused the exception. Otherwise this number will dominate the exceptions count. 768 rpcServer.getMetrics().exception(sizeIOE); 769 } 770 771 // Now that there's an exception is known to be created 772 // use it for the response. 773 // 774 // This will create a copy in the builder. 775 NameBytesPair pair = ResponseConverter.buildException(sizeIOE); 776 resultOrExceptionBuilder.setException(pair); 777 context.incrementResponseExceptionSize(pair.getSerializedSize()); 778 resultOrExceptionBuilder.setIndex(action.getIndex()); 779 builder.addResultOrException(resultOrExceptionBuilder.build()); 780 skipCellsForMutation(action, cellScanner); 781 continue; 782 } 783 if (action.hasGet()) { 784 long before = EnvironmentEdgeManager.currentTime(); 785 ClientProtos.Get pbGet = action.getGet(); 786 // An asynchbase client, https://github.com/OpenTSDB/asynchbase, starts by trying to do 787 // a get closest before. Throwing the UnknownProtocolException signals it that it needs 788 // to switch and do hbase2 protocol (HBase servers do not tell clients what versions 789 // they are; its a problem for non-native clients like asynchbase. HBASE-20225. 790 if (pbGet.hasClosestRowBefore() && pbGet.getClosestRowBefore()) { 791 throw new UnknownProtocolException("Is this a pre-hbase-1.0.0 or asynchbase client? " 792 + "Client is invoking getClosestRowBefore removed in hbase-2.0.0 replaced by " 793 + "reverse Scan."); 794 } 795 try { 796 Get get = ProtobufUtil.toGet(pbGet); 797 if (context != null) { 798 r = get(get, (region), closeCallBack, context); 799 } else { 800 r = region.get(get); 801 } 802 } finally { 803 final MetricsRegionServer metricsRegionServer = server.getMetrics(); 804 if (metricsRegionServer != null) { 805 long blockBytesScanned = 806 context != null ? context.getBlockBytesScanned() - blockBytesScannedBefore : 0; 807 metricsRegionServer.updateGet(region, EnvironmentEdgeManager.currentTime() - before, 808 blockBytesScanned); 809 } 810 } 811 } else if (action.hasServiceCall()) { 812 hasResultOrException = true; 813 Message result = execServiceOnRegion(region, action.getServiceCall()); 814 ClientProtos.CoprocessorServiceResult.Builder serviceResultBuilder = 815 ClientProtos.CoprocessorServiceResult.newBuilder(); 816 resultOrExceptionBuilder.setServiceResult(serviceResultBuilder 817 .setValue(serviceResultBuilder.getValueBuilder().setName(result.getClass().getName()) 818 // TODO: Copy!!! 819 .setValue(UnsafeByteOperations.unsafeWrap(result.toByteArray())))); 820 } else if (action.hasMutation()) { 821 MutationType type = action.getMutation().getMutateType(); 822 if ( 823 type != MutationType.PUT && type != MutationType.DELETE && mutations != null 824 && !mutations.isEmpty() 825 ) { 826 // Flush out any Puts or Deletes already collected. 827 doNonAtomicBatchOp(builder, region, quota, mutations, cellScanner, 828 spaceQuotaEnforcement); 829 mutations.clear(); 830 } 831 switch (type) { 832 case APPEND: 833 r = append(region, quota, action.getMutation(), cellScanner, nonceGroup, 834 spaceQuotaEnforcement, context); 835 break; 836 case INCREMENT: 837 r = increment(region, quota, action.getMutation(), cellScanner, nonceGroup, 838 spaceQuotaEnforcement, context); 839 break; 840 case PUT: 841 case DELETE: 842 // Collect the individual mutations and apply in a batch 843 if (mutations == null) { 844 mutations = new ArrayList<>(actions.getActionCount()); 845 } 846 mutations.add(action); 847 break; 848 default: 849 throw new DoNotRetryIOException("Unsupported mutate type: " + type.name()); 850 } 851 } else { 852 throw new HBaseIOException("Unexpected Action type"); 853 } 854 if (r != null) { 855 ClientProtos.Result pbResult = null; 856 if (isClientCellBlockSupport(context)) { 857 pbResult = ProtobufUtil.toResultNoData(r); 858 // Hard to guess the size here. Just make a rough guess. 859 if (cellsToReturn == null) { 860 cellsToReturn = new ArrayList<>(); 861 } 862 cellsToReturn.add(r); 863 } else { 864 pbResult = ProtobufUtil.toResult(r); 865 } 866 addSize(context, r); 867 hasResultOrException = true; 868 resultOrExceptionBuilder.setResult(pbResult); 869 } 870 // Could get to here and there was no result and no exception. Presumes we added 871 // a Put or Delete to the collecting Mutations List for adding later. In this 872 // case the corresponding ResultOrException instance for the Put or Delete will be added 873 // down in the doNonAtomicBatchOp method call rather than up here. 874 } catch (IOException ie) { 875 rpcServer.getMetrics().exception(ie); 876 hasResultOrException = true; 877 NameBytesPair pair = ResponseConverter.buildException(ie); 878 resultOrExceptionBuilder.setException(pair); 879 context.incrementResponseExceptionSize(pair.getSerializedSize()); 880 } 881 if (hasResultOrException) { 882 // Propagate index. 883 resultOrExceptionBuilder.setIndex(action.getIndex()); 884 builder.addResultOrException(resultOrExceptionBuilder.build()); 885 } 886 } 887 // Finish up any outstanding mutations 888 if (!CollectionUtils.isEmpty(mutations)) { 889 doNonAtomicBatchOp(builder, region, quota, mutations, cellScanner, spaceQuotaEnforcement); 890 } 891 return cellsToReturn; 892 } 893 894 private void checkCellSizeLimit(final HRegion r, final Mutation m) throws IOException { 895 if (r.maxCellSize > 0) { 896 CellScanner cells = m.cellScanner(); 897 while (cells.advance()) { 898 int size = PrivateCellUtil.estimatedSerializedSizeOf(cells.current()); 899 if (size > r.maxCellSize) { 900 String msg = "Cell[" + cells.current() + "] with size " + size + " exceeds limit of " 901 + r.maxCellSize + " bytes"; 902 LOG.debug(msg); 903 throw new DoNotRetryIOException(msg); 904 } 905 } 906 } 907 } 908 909 private void doAtomicBatchOp(final RegionActionResult.Builder builder, final HRegion region, 910 final OperationQuota quota, final List<ClientProtos.Action> mutations, final CellScanner cells, 911 long nonceGroup, ActivePolicyEnforcement spaceQuotaEnforcement) throws IOException { 912 // Just throw the exception. The exception will be caught and then added to region-level 913 // exception for RegionAction. Leaving the null to action result is ok since the null 914 // result is viewed as failure by hbase client. And the region-lever exception will be used 915 // to replaced the null result. see AsyncRequestFutureImpl#receiveMultiAction and 916 // AsyncBatchRpcRetryingCaller#onComplete for more details. 917 doBatchOp(builder, region, quota, mutations, cells, nonceGroup, spaceQuotaEnforcement, true); 918 } 919 920 private void doNonAtomicBatchOp(final RegionActionResult.Builder builder, final HRegion region, 921 final OperationQuota quota, final List<ClientProtos.Action> mutations, final CellScanner cells, 922 ActivePolicyEnforcement spaceQuotaEnforcement) { 923 try { 924 doBatchOp(builder, region, quota, mutations, cells, HConstants.NO_NONCE, 925 spaceQuotaEnforcement, false); 926 } catch (IOException e) { 927 // Set the exception for each action. The mutations in same RegionAction are group to 928 // different batch and then be processed individually. Hence, we don't set the region-level 929 // exception here for whole RegionAction. 930 for (Action mutation : mutations) { 931 builder.addResultOrException(getResultOrException(e, mutation.getIndex())); 932 } 933 } 934 } 935 936 /** 937 * Execute a list of mutations. 938 */ 939 private void doBatchOp(final RegionActionResult.Builder builder, final HRegion region, 940 final OperationQuota quota, final List<ClientProtos.Action> mutations, final CellScanner cells, 941 long nonceGroup, ActivePolicyEnforcement spaceQuotaEnforcement, boolean atomic) 942 throws IOException { 943 Mutation[] mArray = new Mutation[mutations.size()]; 944 long before = EnvironmentEdgeManager.currentTime(); 945 boolean batchContainsPuts = false, batchContainsDelete = false; 946 try { 947 /** 948 * HBASE-17924 mutationActionMap is a map to map the relation between mutations and actions 949 * since mutation array may have been reoredered.In order to return the right result or 950 * exception to the corresponding actions, We need to know which action is the mutation belong 951 * to. We can't sort ClientProtos.Action array, since they are bonded to cellscanners. 952 */ 953 Map<Mutation, ClientProtos.Action> mutationActionMap = new HashMap<>(); 954 int i = 0; 955 long nonce = HConstants.NO_NONCE; 956 for (ClientProtos.Action action : mutations) { 957 if (action.hasGet()) { 958 throw new DoNotRetryIOException( 959 "Atomic put and/or delete only, not a Get=" + action.getGet()); 960 } 961 MutationProto m = action.getMutation(); 962 Mutation mutation; 963 switch (m.getMutateType()) { 964 case PUT: 965 mutation = ProtobufUtil.toPut(m, cells); 966 batchContainsPuts = true; 967 break; 968 969 case DELETE: 970 mutation = ProtobufUtil.toDelete(m, cells); 971 batchContainsDelete = true; 972 break; 973 974 case INCREMENT: 975 mutation = ProtobufUtil.toIncrement(m, cells); 976 nonce = m.hasNonce() ? m.getNonce() : HConstants.NO_NONCE; 977 break; 978 979 case APPEND: 980 mutation = ProtobufUtil.toAppend(m, cells); 981 nonce = m.hasNonce() ? m.getNonce() : HConstants.NO_NONCE; 982 break; 983 984 default: 985 throw new DoNotRetryIOException("Invalid mutation type : " + m.getMutateType()); 986 } 987 mutationActionMap.put(mutation, action); 988 mArray[i++] = mutation; 989 checkCellSizeLimit(region, mutation); 990 // Check if a space quota disallows this mutation 991 spaceQuotaEnforcement.getPolicyEnforcement(region).check(mutation); 992 quota.addMutation(mutation); 993 } 994 995 if (!region.getRegionInfo().isMetaRegion()) { 996 server.getMemStoreFlusher().reclaimMemStoreMemory(); 997 } 998 999 // HBASE-17924 1000 // Sort to improve lock efficiency for non-atomic batch of operations. If atomic 1001 // order is preserved as its expected from the client 1002 if (!atomic) { 1003 Arrays.sort(mArray, (v1, v2) -> Row.COMPARATOR.compare(v1, v2)); 1004 } 1005 1006 OperationStatus[] codes = region.batchMutate(mArray, atomic, nonceGroup, nonce); 1007 1008 // When atomic is true, it indicates that the mutateRow API or the batch API with 1009 // RowMutations is called. In this case, we need to merge the results of the 1010 // Increment/Append operations if the mutations include those operations, and set the merged 1011 // result to the first element of the ResultOrException list 1012 if (atomic) { 1013 List<ResultOrException> resultOrExceptions = new ArrayList<>(); 1014 List<Result> results = new ArrayList<>(); 1015 for (i = 0; i < codes.length; i++) { 1016 if (codes[i].getResult() != null) { 1017 results.add(codes[i].getResult()); 1018 } 1019 if (i != 0) { 1020 resultOrExceptions 1021 .add(getResultOrException(ClientProtos.Result.getDefaultInstance(), i)); 1022 } 1023 } 1024 1025 if (results.isEmpty()) { 1026 builder.addResultOrException( 1027 getResultOrException(ClientProtos.Result.getDefaultInstance(), 0)); 1028 } else { 1029 // Merge the results of the Increment/Append operations 1030 List<Cell> cellList = new ArrayList<>(); 1031 for (Result result : results) { 1032 if (result.rawCells() != null) { 1033 cellList.addAll(Arrays.asList(result.rawCells())); 1034 } 1035 } 1036 Result result = Result.create(cellList); 1037 1038 // Set the merged result of the Increment/Append operations to the first element of the 1039 // ResultOrException list 1040 builder.addResultOrException(getResultOrException(ProtobufUtil.toResult(result), 0)); 1041 } 1042 1043 builder.addAllResultOrException(resultOrExceptions); 1044 return; 1045 } 1046 1047 for (i = 0; i < codes.length; i++) { 1048 Mutation currentMutation = mArray[i]; 1049 ClientProtos.Action currentAction = mutationActionMap.get(currentMutation); 1050 int index = currentAction.hasIndex() ? currentAction.getIndex() : i; 1051 Exception e; 1052 switch (codes[i].getOperationStatusCode()) { 1053 case BAD_FAMILY: 1054 e = new NoSuchColumnFamilyException(codes[i].getExceptionMsg()); 1055 builder.addResultOrException(getResultOrException(e, index)); 1056 break; 1057 1058 case SANITY_CHECK_FAILURE: 1059 e = new FailedSanityCheckException(codes[i].getExceptionMsg()); 1060 builder.addResultOrException(getResultOrException(e, index)); 1061 break; 1062 1063 default: 1064 e = new DoNotRetryIOException(codes[i].getExceptionMsg()); 1065 builder.addResultOrException(getResultOrException(e, index)); 1066 break; 1067 1068 case SUCCESS: 1069 ClientProtos.Result result = codes[i].getResult() == null 1070 ? ClientProtos.Result.getDefaultInstance() 1071 : ProtobufUtil.toResult(codes[i].getResult()); 1072 builder.addResultOrException(getResultOrException(result, index)); 1073 break; 1074 1075 case STORE_TOO_BUSY: 1076 e = new RegionTooBusyException(codes[i].getExceptionMsg()); 1077 builder.addResultOrException(getResultOrException(e, index)); 1078 break; 1079 } 1080 } 1081 } finally { 1082 int processedMutationIndex = 0; 1083 for (Action mutation : mutations) { 1084 // The non-null mArray[i] means the cell scanner has been read. 1085 if (mArray[processedMutationIndex++] == null) { 1086 skipCellsForMutation(mutation, cells); 1087 } 1088 } 1089 updateMutationMetrics(region, before, batchContainsPuts, batchContainsDelete); 1090 } 1091 } 1092 1093 private void updateMutationMetrics(HRegion region, long starttime, boolean batchContainsPuts, 1094 boolean batchContainsDelete) { 1095 final MetricsRegionServer metricsRegionServer = server.getMetrics(); 1096 if (metricsRegionServer != null) { 1097 long after = EnvironmentEdgeManager.currentTime(); 1098 if (batchContainsPuts) { 1099 metricsRegionServer.updatePutBatch(region, after - starttime); 1100 } 1101 if (batchContainsDelete) { 1102 metricsRegionServer.updateDeleteBatch(region, after - starttime); 1103 } 1104 } 1105 } 1106 1107 /** 1108 * Execute a list of Put/Delete mutations. The function returns OperationStatus instead of 1109 * constructing MultiResponse to save a possible loop if caller doesn't need MultiResponse. 1110 * @return an array of OperationStatus which internally contains the OperationStatusCode and the 1111 * exceptionMessage if any 1112 * @deprecated Since 3.0.0, will be removed in 4.0.0. We do not use this method for replaying 1113 * edits for secondary replicas any more, see 1114 * {@link #replicateToReplica(RpcController, ReplicateWALEntryRequest)}. 1115 */ 1116 @Deprecated 1117 private OperationStatus[] doReplayBatchOp(final HRegion region, 1118 final List<MutationReplay> mutations, long replaySeqId) throws IOException { 1119 long before = EnvironmentEdgeManager.currentTime(); 1120 boolean batchContainsPuts = false, batchContainsDelete = false; 1121 try { 1122 for (Iterator<MutationReplay> it = mutations.iterator(); it.hasNext();) { 1123 MutationReplay m = it.next(); 1124 1125 if (m.getType() == MutationType.PUT) { 1126 batchContainsPuts = true; 1127 } else { 1128 batchContainsDelete = true; 1129 } 1130 1131 NavigableMap<byte[], List<Cell>> map = m.mutation.getFamilyCellMap(); 1132 List<Cell> metaCells = map.get(WALEdit.METAFAMILY); 1133 if (metaCells != null && !metaCells.isEmpty()) { 1134 for (Cell metaCell : metaCells) { 1135 CompactionDescriptor compactionDesc = WALEdit.getCompaction(metaCell); 1136 boolean isDefaultReplica = RegionReplicaUtil.isDefaultReplica(region.getRegionInfo()); 1137 HRegion hRegion = region; 1138 if (compactionDesc != null) { 1139 // replay the compaction. Remove the files from stores only if we are the primary 1140 // region replica (thus own the files) 1141 hRegion.replayWALCompactionMarker(compactionDesc, !isDefaultReplica, isDefaultReplica, 1142 replaySeqId); 1143 continue; 1144 } 1145 FlushDescriptor flushDesc = WALEdit.getFlushDescriptor(metaCell); 1146 if (flushDesc != null && !isDefaultReplica) { 1147 hRegion.replayWALFlushMarker(flushDesc, replaySeqId); 1148 continue; 1149 } 1150 RegionEventDescriptor regionEvent = WALEdit.getRegionEventDescriptor(metaCell); 1151 if (regionEvent != null && !isDefaultReplica) { 1152 hRegion.replayWALRegionEventMarker(regionEvent); 1153 continue; 1154 } 1155 BulkLoadDescriptor bulkLoadEvent = WALEdit.getBulkLoadDescriptor(metaCell); 1156 if (bulkLoadEvent != null) { 1157 hRegion.replayWALBulkLoadEventMarker(bulkLoadEvent); 1158 continue; 1159 } 1160 } 1161 it.remove(); 1162 } 1163 } 1164 requestCount.increment(); 1165 if (!region.getRegionInfo().isMetaRegion()) { 1166 server.getMemStoreFlusher().reclaimMemStoreMemory(); 1167 } 1168 return region.batchReplay(mutations.toArray(new MutationReplay[mutations.size()]), 1169 replaySeqId); 1170 } finally { 1171 updateMutationMetrics(region, before, batchContainsPuts, batchContainsDelete); 1172 } 1173 } 1174 1175 private void closeAllScanners() { 1176 // Close any outstanding scanners. Means they'll get an UnknownScanner 1177 // exception next time they come in. 1178 for (Map.Entry<String, RegionScannerHolder> e : scanners.entrySet()) { 1179 try { 1180 e.getValue().s.close(); 1181 } catch (IOException ioe) { 1182 LOG.warn("Closing scanner " + e.getKey(), ioe); 1183 } 1184 } 1185 } 1186 1187 // Directly invoked only for testing 1188 public RSRpcServices(final HRegionServer rs) throws IOException { 1189 super(rs, rs.getProcessName()); 1190 final Configuration conf = rs.getConfiguration(); 1191 setReloadableGuardrails(conf); 1192 scannerLeaseTimeoutPeriod = conf.getInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, 1193 HConstants.DEFAULT_HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD); 1194 rpcTimeout = 1195 conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY, HConstants.DEFAULT_HBASE_RPC_TIMEOUT); 1196 minimumScanTimeLimitDelta = conf.getLong(REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA, 1197 DEFAULT_REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA); 1198 rpcServer.setNamedQueueRecorder(rs.getNamedQueueRecorder()); 1199 closedScanners = CacheBuilder.newBuilder() 1200 .expireAfterAccess(scannerLeaseTimeoutPeriod, TimeUnit.MILLISECONDS).build(); 1201 } 1202 1203 @Override 1204 protected boolean defaultReservoirEnabled() { 1205 return true; 1206 } 1207 1208 @Override 1209 protected ServerType getDNSServerType() { 1210 return DNS.ServerType.REGIONSERVER; 1211 } 1212 1213 @Override 1214 protected String getHostname(Configuration conf, String defaultHostname) { 1215 return conf.get("hbase.regionserver.ipc.address", defaultHostname); 1216 } 1217 1218 @Override 1219 protected String getPortConfigName() { 1220 return HConstants.REGIONSERVER_PORT; 1221 } 1222 1223 @Override 1224 protected int getDefaultPort() { 1225 return HConstants.DEFAULT_REGIONSERVER_PORT; 1226 } 1227 1228 @Override 1229 protected Class<?> getRpcSchedulerFactoryClass(Configuration conf) { 1230 return conf.getClass(REGION_SERVER_RPC_SCHEDULER_FACTORY_CLASS, 1231 SimpleRpcSchedulerFactory.class); 1232 } 1233 1234 protected RpcServerInterface createRpcServer(final Server server, 1235 final RpcSchedulerFactory rpcSchedulerFactory, final InetSocketAddress bindAddress, 1236 final String name) throws IOException { 1237 final Configuration conf = server.getConfiguration(); 1238 boolean reservoirEnabled = conf.getBoolean(ByteBuffAllocator.ALLOCATOR_POOL_ENABLED_KEY, true); 1239 try { 1240 return RpcServerFactory.createRpcServer(server, name, getServices(), bindAddress, // use final 1241 // bindAddress 1242 // for this 1243 // server. 1244 conf, rpcSchedulerFactory.create(conf, this, server), reservoirEnabled); 1245 } catch (BindException be) { 1246 throw new IOException(be.getMessage() + ". To switch ports use the '" 1247 + HConstants.REGIONSERVER_PORT + "' configuration property.", 1248 be.getCause() != null ? be.getCause() : be); 1249 } 1250 } 1251 1252 protected Class<?> getRpcSchedulerFactoryClass() { 1253 final Configuration conf = server.getConfiguration(); 1254 return conf.getClass(REGION_SERVER_RPC_SCHEDULER_FACTORY_CLASS, 1255 SimpleRpcSchedulerFactory.class); 1256 } 1257 1258 protected PriorityFunction createPriority() { 1259 return new RSAnnotationReadingPriorityFunction(this); 1260 } 1261 1262 public int getScannersCount() { 1263 return scanners.size(); 1264 } 1265 1266 /** Returns The outstanding RegionScanner for <code>scannerId</code> or null if none found. */ 1267 RegionScanner getScanner(long scannerId) { 1268 RegionScannerHolder rsh = getRegionScannerHolder(scannerId); 1269 return rsh == null ? null : rsh.s; 1270 } 1271 1272 /** Returns The associated RegionScannerHolder for <code>scannerId</code> or null. */ 1273 private RegionScannerHolder getRegionScannerHolder(long scannerId) { 1274 return scanners.get(toScannerName(scannerId)); 1275 } 1276 1277 public String getScanDetailsWithId(long scannerId) { 1278 RegionScanner scanner = getScanner(scannerId); 1279 if (scanner == null) { 1280 return null; 1281 } 1282 StringBuilder builder = new StringBuilder(); 1283 builder.append("table: ").append(scanner.getRegionInfo().getTable().getNameAsString()); 1284 builder.append(" region: ").append(scanner.getRegionInfo().getRegionNameAsString()); 1285 builder.append(" operation_id: ").append(scanner.getOperationId()); 1286 return builder.toString(); 1287 } 1288 1289 public String getScanDetailsWithRequest(ScanRequest request) { 1290 try { 1291 if (!request.hasRegion()) { 1292 return null; 1293 } 1294 Region region = getRegion(request.getRegion()); 1295 StringBuilder builder = new StringBuilder(); 1296 builder.append("table: ").append(region.getRegionInfo().getTable().getNameAsString()); 1297 builder.append(" region: ").append(region.getRegionInfo().getRegionNameAsString()); 1298 for (NameBytesPair pair : request.getScan().getAttributeList()) { 1299 if (OperationWithAttributes.ID_ATRIBUTE.equals(pair.getName())) { 1300 builder.append(" operation_id: ").append(Bytes.toString(pair.getValue().toByteArray())); 1301 break; 1302 } 1303 } 1304 return builder.toString(); 1305 } catch (IOException ignored) { 1306 return null; 1307 } 1308 } 1309 1310 /** 1311 * Get the vtime associated with the scanner. Currently the vtime is the number of "next" calls. 1312 */ 1313 long getScannerVirtualTime(long scannerId) { 1314 RegionScannerHolder rsh = getRegionScannerHolder(scannerId); 1315 return rsh == null ? 0L : rsh.getNextCallSeq(); 1316 } 1317 1318 /** 1319 * Method to account for the size of retained cells. 1320 * @param context rpc call context 1321 * @param r result to add size. 1322 * @return an object that represents the last referenced block from this response. 1323 */ 1324 void addSize(RpcCallContext context, Result r) { 1325 if (context != null && r != null && !r.isEmpty()) { 1326 for (Cell c : r.rawCells()) { 1327 context.incrementResponseCellSize(PrivateCellUtil.estimatedSerializedSizeOf(c)); 1328 } 1329 } 1330 } 1331 1332 /** Returns Remote client's ip and port else null if can't be determined. */ 1333 @RestrictedApi(explanation = "Should only be called in TestRSRpcServices and RSRpcServices", 1334 link = "", allowedOnPath = ".*(TestRSRpcServices|RSRpcServices).java") 1335 static String getRemoteClientIpAndPort() { 1336 RpcCall rpcCall = RpcServer.getCurrentCall().orElse(null); 1337 if (rpcCall == null) { 1338 return HConstants.EMPTY_STRING; 1339 } 1340 InetAddress address = rpcCall.getRemoteAddress(); 1341 if (address == null) { 1342 return HConstants.EMPTY_STRING; 1343 } 1344 // Be careful here with InetAddress. Do InetAddress#getHostAddress. It will not do a name 1345 // resolution. Just use the IP. It is generally a smaller amount of info to keep around while 1346 // scanning than a hostname anyways. 1347 return Address.fromParts(address.getHostAddress(), rpcCall.getRemotePort()).toString(); 1348 } 1349 1350 /** Returns Remote client's username. */ 1351 @RestrictedApi(explanation = "Should only be called in TestRSRpcServices and RSRpcServices", 1352 link = "", allowedOnPath = ".*(TestRSRpcServices|RSRpcServices).java") 1353 static String getUserName() { 1354 RpcCall rpcCall = RpcServer.getCurrentCall().orElse(null); 1355 if (rpcCall == null) { 1356 return HConstants.EMPTY_STRING; 1357 } 1358 return rpcCall.getRequestUserName().orElse(HConstants.EMPTY_STRING); 1359 } 1360 1361 private RegionScannerHolder addScanner(String scannerName, RegionScanner s, Shipper shipper, 1362 HRegion r, boolean needCursor, boolean fullRegionScan) throws LeaseStillHeldException { 1363 Lease lease = server.getLeaseManager().createLease(scannerName, this.scannerLeaseTimeoutPeriod, 1364 new ScannerListener(scannerName)); 1365 RpcCallback shippedCallback = new RegionScannerShippedCallBack(scannerName, shipper, lease); 1366 RpcCallback closeCallback = 1367 s instanceof RpcCallback ? (RpcCallback) s : new RegionScannerCloseCallBack(s); 1368 RegionScannerHolder rsh = new RegionScannerHolder(s, r, closeCallback, shippedCallback, 1369 needCursor, fullRegionScan, getRemoteClientIpAndPort(), getUserName()); 1370 RegionScannerHolder existing = scanners.putIfAbsent(scannerName, rsh); 1371 assert existing == null : "scannerId must be unique within regionserver's whole lifecycle! " 1372 + scannerName + ", " + existing; 1373 return rsh; 1374 } 1375 1376 private boolean isFullRegionScan(Scan scan, HRegion region) { 1377 // If the scan start row equals or less than the start key of the region 1378 // and stop row greater than equals end key (if stop row present) 1379 // or if the stop row is empty 1380 // account this as a full region scan 1381 if ( 1382 Bytes.compareTo(scan.getStartRow(), region.getRegionInfo().getStartKey()) <= 0 1383 && (Bytes.compareTo(scan.getStopRow(), region.getRegionInfo().getEndKey()) >= 0 1384 && !Bytes.equals(region.getRegionInfo().getEndKey(), HConstants.EMPTY_END_ROW) 1385 || Bytes.equals(scan.getStopRow(), HConstants.EMPTY_END_ROW)) 1386 ) { 1387 return true; 1388 } 1389 return false; 1390 } 1391 1392 /** 1393 * Find the HRegion based on a region specifier 1394 * @param regionSpecifier the region specifier 1395 * @return the corresponding region 1396 * @throws IOException if the specifier is not null, but failed to find the region 1397 */ 1398 public HRegion getRegion(final RegionSpecifier regionSpecifier) throws IOException { 1399 return server.getRegion(regionSpecifier.getValue().toByteArray()); 1400 } 1401 1402 /** 1403 * Find the List of HRegions based on a list of region specifiers 1404 * @param regionSpecifiers the list of region specifiers 1405 * @return the corresponding list of regions 1406 * @throws IOException if any of the specifiers is not null, but failed to find the region 1407 */ 1408 private List<HRegion> getRegions(final List<RegionSpecifier> regionSpecifiers, 1409 final CacheEvictionStatsBuilder stats) { 1410 List<HRegion> regions = Lists.newArrayListWithCapacity(regionSpecifiers.size()); 1411 for (RegionSpecifier regionSpecifier : regionSpecifiers) { 1412 try { 1413 regions.add(server.getRegion(regionSpecifier.getValue().toByteArray())); 1414 } catch (NotServingRegionException e) { 1415 stats.addException(regionSpecifier.getValue().toByteArray(), e); 1416 } 1417 } 1418 return regions; 1419 } 1420 1421 PriorityFunction getPriority() { 1422 return priority; 1423 } 1424 1425 private RegionServerRpcQuotaManager getRpcQuotaManager() { 1426 return server.getRegionServerRpcQuotaManager(); 1427 } 1428 1429 private RegionServerSpaceQuotaManager getSpaceQuotaManager() { 1430 return server.getRegionServerSpaceQuotaManager(); 1431 } 1432 1433 void start(ZKWatcher zkWatcher) { 1434 this.scannerIdGenerator = new ScannerIdGenerator(this.server.getServerName()); 1435 internalStart(zkWatcher); 1436 } 1437 1438 void stop() { 1439 closeAllScanners(); 1440 internalStop(); 1441 } 1442 1443 /** 1444 * Called to verify that this server is up and running. 1445 */ 1446 // TODO : Rename this and HMaster#checkInitialized to isRunning() (or a better name). 1447 protected void checkOpen() throws IOException { 1448 if (server.isAborted()) { 1449 throw new RegionServerAbortedException("Server " + server.getServerName() + " aborting"); 1450 } 1451 if (server.isStopped()) { 1452 throw new RegionServerStoppedException("Server " + server.getServerName() + " stopping"); 1453 } 1454 if (!server.isDataFileSystemOk()) { 1455 throw new RegionServerStoppedException("File system not available"); 1456 } 1457 if (!server.isOnline()) { 1458 throw new ServerNotRunningYetException( 1459 "Server " + server.getServerName() + " is not running yet"); 1460 } 1461 } 1462 1463 /** 1464 * By default, put up an Admin and a Client Service. Set booleans 1465 * <code>hbase.regionserver.admin.executorService</code> and 1466 * <code>hbase.regionserver.client.executorService</code> if you want to enable/disable services. 1467 * Default is that both are enabled. 1468 * @return immutable list of blocking services and the security info classes that this server 1469 * supports 1470 */ 1471 protected List<BlockingServiceAndInterface> getServices() { 1472 boolean admin = getConfiguration().getBoolean(REGIONSERVER_ADMIN_SERVICE_CONFIG, true); 1473 boolean client = getConfiguration().getBoolean(REGIONSERVER_CLIENT_SERVICE_CONFIG, true); 1474 boolean clientMeta = 1475 getConfiguration().getBoolean(REGIONSERVER_CLIENT_META_SERVICE_CONFIG, true); 1476 boolean bootstrapNodes = 1477 getConfiguration().getBoolean(REGIONSERVER_BOOTSTRAP_NODES_SERVICE_CONFIG, true); 1478 List<BlockingServiceAndInterface> bssi = new ArrayList<>(); 1479 if (client) { 1480 bssi.add(new BlockingServiceAndInterface(ClientService.newReflectiveBlockingService(this), 1481 ClientService.BlockingInterface.class)); 1482 } 1483 if (admin) { 1484 bssi.add(new BlockingServiceAndInterface(AdminService.newReflectiveBlockingService(this), 1485 AdminService.BlockingInterface.class)); 1486 } 1487 if (clientMeta) { 1488 bssi.add(new BlockingServiceAndInterface(ClientMetaService.newReflectiveBlockingService(this), 1489 ClientMetaService.BlockingInterface.class)); 1490 } 1491 if (bootstrapNodes) { 1492 bssi.add( 1493 new BlockingServiceAndInterface(BootstrapNodeService.newReflectiveBlockingService(this), 1494 BootstrapNodeService.BlockingInterface.class)); 1495 } 1496 return new ImmutableList.Builder<BlockingServiceAndInterface>().addAll(bssi).build(); 1497 } 1498 1499 /** 1500 * Close a region on the region server. 1501 * @param controller the RPC controller 1502 * @param request the request 1503 */ 1504 @Override 1505 @QosPriority(priority = HConstants.ADMIN_QOS) 1506 public CloseRegionResponse closeRegion(final RpcController controller, 1507 final CloseRegionRequest request) throws ServiceException { 1508 final ServerName sn = (request.hasDestinationServer() 1509 ? ProtobufUtil.toServerName(request.getDestinationServer()) 1510 : null); 1511 1512 try { 1513 checkOpen(); 1514 throwOnWrongStartCode(request); 1515 final String encodedRegionName = ProtobufUtil.getRegionEncodedName(request.getRegion()); 1516 1517 requestCount.increment(); 1518 if (sn == null) { 1519 LOG.info("Close " + encodedRegionName + " without moving"); 1520 } else { 1521 LOG.info("Close " + encodedRegionName + ", moving to " + sn); 1522 } 1523 boolean closed = server.closeRegion(encodedRegionName, false, sn); 1524 CloseRegionResponse.Builder builder = CloseRegionResponse.newBuilder().setClosed(closed); 1525 return builder.build(); 1526 } catch (IOException ie) { 1527 throw new ServiceException(ie); 1528 } 1529 } 1530 1531 /** 1532 * Compact a region on the region server. 1533 * @param controller the RPC controller 1534 * @param request the request 1535 */ 1536 @Override 1537 @QosPriority(priority = HConstants.ADMIN_QOS) 1538 public CompactRegionResponse compactRegion(final RpcController controller, 1539 final CompactRegionRequest request) throws ServiceException { 1540 try { 1541 checkOpen(); 1542 requestCount.increment(); 1543 HRegion region = getRegion(request.getRegion()); 1544 // Quota support is enabled, the requesting user is not system/super user 1545 // and a quota policy is enforced that disables compactions. 1546 if ( 1547 QuotaUtil.isQuotaEnabled(getConfiguration()) 1548 && !Superusers.isSuperUser(RpcServer.getRequestUser().orElse(null)) 1549 && this.server.getRegionServerSpaceQuotaManager() 1550 .areCompactionsDisabled(region.getTableDescriptor().getTableName()) 1551 ) { 1552 throw new DoNotRetryIOException( 1553 "Compactions on this region are " + "disabled due to a space quota violation."); 1554 } 1555 region.startRegionOperation(Operation.COMPACT_REGION); 1556 LOG.info("Compacting " + region.getRegionInfo().getRegionNameAsString()); 1557 boolean major = request.hasMajor() && request.getMajor(); 1558 if (request.hasFamily()) { 1559 byte[] family = request.getFamily().toByteArray(); 1560 String log = "User-triggered " + (major ? "major " : "") + "compaction for region " 1561 + region.getRegionInfo().getRegionNameAsString() + " and family " 1562 + Bytes.toString(family); 1563 LOG.trace(log); 1564 region.requestCompaction(family, log, Store.PRIORITY_USER, major, 1565 CompactionLifeCycleTracker.DUMMY); 1566 } else { 1567 String log = "User-triggered " + (major ? "major " : "") + "compaction for region " 1568 + region.getRegionInfo().getRegionNameAsString(); 1569 LOG.trace(log); 1570 region.requestCompaction(log, Store.PRIORITY_USER, major, CompactionLifeCycleTracker.DUMMY); 1571 } 1572 return CompactRegionResponse.newBuilder().build(); 1573 } catch (IOException ie) { 1574 throw new ServiceException(ie); 1575 } 1576 } 1577 1578 @Override 1579 @QosPriority(priority = HConstants.ADMIN_QOS) 1580 public CompactionSwitchResponse compactionSwitch(RpcController controller, 1581 CompactionSwitchRequest request) throws ServiceException { 1582 rpcPreCheck("compactionSwitch"); 1583 final CompactSplit compactSplitThread = server.getCompactSplitThread(); 1584 requestCount.increment(); 1585 boolean prevState = compactSplitThread.isCompactionsEnabled(); 1586 CompactionSwitchResponse response = 1587 CompactionSwitchResponse.newBuilder().setPrevState(prevState).build(); 1588 if (prevState == request.getEnabled()) { 1589 // passed in requested state is same as current state. No action required 1590 return response; 1591 } 1592 compactSplitThread.switchCompaction(request.getEnabled()); 1593 return response; 1594 } 1595 1596 /** 1597 * Flush a region on the region server. 1598 * @param controller the RPC controller 1599 * @param request the request 1600 */ 1601 @Override 1602 @QosPriority(priority = HConstants.ADMIN_QOS) 1603 public FlushRegionResponse flushRegion(final RpcController controller, 1604 final FlushRegionRequest request) throws ServiceException { 1605 try { 1606 checkOpen(); 1607 requestCount.increment(); 1608 HRegion region = getRegion(request.getRegion()); 1609 LOG.info("Flushing " + region.getRegionInfo().getRegionNameAsString()); 1610 boolean shouldFlush = true; 1611 if (request.hasIfOlderThanTs()) { 1612 shouldFlush = region.getEarliestFlushTimeForAllStores() < request.getIfOlderThanTs(); 1613 } 1614 FlushRegionResponse.Builder builder = FlushRegionResponse.newBuilder(); 1615 if (shouldFlush) { 1616 boolean writeFlushWalMarker = 1617 request.hasWriteFlushWalMarker() ? request.getWriteFlushWalMarker() : false; 1618 // Go behind the curtain so we can manage writing of the flush WAL marker 1619 HRegion.FlushResultImpl flushResult = null; 1620 if (request.hasFamily()) { 1621 List families = new ArrayList(); 1622 families.add(request.getFamily().toByteArray()); 1623 flushResult = 1624 region.flushcache(families, writeFlushWalMarker, FlushLifeCycleTracker.DUMMY); 1625 } else { 1626 flushResult = region.flushcache(true, writeFlushWalMarker, FlushLifeCycleTracker.DUMMY); 1627 } 1628 boolean compactionNeeded = flushResult.isCompactionNeeded(); 1629 if (compactionNeeded) { 1630 server.getCompactSplitThread().requestSystemCompaction(region, 1631 "Compaction through user triggered flush"); 1632 } 1633 builder.setFlushed(flushResult.isFlushSucceeded()); 1634 builder.setWroteFlushWalMarker(flushResult.wroteFlushWalMarker); 1635 } 1636 builder.setLastFlushTime(region.getEarliestFlushTimeForAllStores()); 1637 return builder.build(); 1638 } catch (DroppedSnapshotException ex) { 1639 // Cache flush can fail in a few places. If it fails in a critical 1640 // section, we get a DroppedSnapshotException and a replay of wal 1641 // is required. Currently the only way to do this is a restart of 1642 // the server. 1643 server.abort("Replay of WAL required. Forcing server shutdown", ex); 1644 throw new ServiceException(ex); 1645 } catch (IOException ie) { 1646 throw new ServiceException(ie); 1647 } 1648 } 1649 1650 @Override 1651 @QosPriority(priority = HConstants.ADMIN_QOS) 1652 public GetOnlineRegionResponse getOnlineRegion(final RpcController controller, 1653 final GetOnlineRegionRequest request) throws ServiceException { 1654 try { 1655 checkOpen(); 1656 requestCount.increment(); 1657 Map<String, HRegion> onlineRegions = server.getOnlineRegions(); 1658 List<RegionInfo> list = new ArrayList<>(onlineRegions.size()); 1659 for (HRegion region : onlineRegions.values()) { 1660 list.add(region.getRegionInfo()); 1661 } 1662 list.sort(RegionInfo.COMPARATOR); 1663 return ResponseConverter.buildGetOnlineRegionResponse(list); 1664 } catch (IOException ie) { 1665 throw new ServiceException(ie); 1666 } 1667 } 1668 1669 // Master implementation of this Admin Service differs given it is not 1670 // able to supply detail only known to RegionServer. See note on 1671 // MasterRpcServers#getRegionInfo. 1672 @Override 1673 @QosPriority(priority = HConstants.ADMIN_QOS) 1674 public GetRegionInfoResponse getRegionInfo(final RpcController controller, 1675 final GetRegionInfoRequest request) throws ServiceException { 1676 try { 1677 checkOpen(); 1678 requestCount.increment(); 1679 HRegion region = getRegion(request.getRegion()); 1680 RegionInfo info = region.getRegionInfo(); 1681 byte[] bestSplitRow; 1682 if (request.hasBestSplitRow() && request.getBestSplitRow()) { 1683 bestSplitRow = region.checkSplit(true).orElse(null); 1684 // when all table data are in memstore, bestSplitRow = null 1685 // try to flush region first 1686 if (bestSplitRow == null) { 1687 region.flush(true); 1688 bestSplitRow = region.checkSplit(true).orElse(null); 1689 } 1690 } else { 1691 bestSplitRow = null; 1692 } 1693 GetRegionInfoResponse.Builder builder = GetRegionInfoResponse.newBuilder(); 1694 builder.setRegionInfo(ProtobufUtil.toRegionInfo(info)); 1695 if (request.hasCompactionState() && request.getCompactionState()) { 1696 builder.setCompactionState(ProtobufUtil.createCompactionState(region.getCompactionState())); 1697 } 1698 builder.setSplittable(region.isSplittable()); 1699 builder.setMergeable(region.isMergeable()); 1700 if (request.hasBestSplitRow() && request.getBestSplitRow() && bestSplitRow != null) { 1701 builder.setBestSplitRow(UnsafeByteOperations.unsafeWrap(bestSplitRow)); 1702 } 1703 return builder.build(); 1704 } catch (IOException ie) { 1705 throw new ServiceException(ie); 1706 } 1707 } 1708 1709 @Override 1710 @QosPriority(priority = HConstants.ADMIN_QOS) 1711 public GetRegionLoadResponse getRegionLoad(RpcController controller, GetRegionLoadRequest request) 1712 throws ServiceException { 1713 1714 List<HRegion> regions; 1715 if (request.hasTableName()) { 1716 TableName tableName = ProtobufUtil.toTableName(request.getTableName()); 1717 regions = server.getRegions(tableName); 1718 } else { 1719 regions = server.getRegions(); 1720 } 1721 List<RegionLoad> rLoads = new ArrayList<>(regions.size()); 1722 RegionLoad.Builder regionLoadBuilder = ClusterStatusProtos.RegionLoad.newBuilder(); 1723 RegionSpecifier.Builder regionSpecifier = RegionSpecifier.newBuilder(); 1724 1725 try { 1726 for (HRegion region : regions) { 1727 rLoads.add(server.createRegionLoad(region, regionLoadBuilder, regionSpecifier)); 1728 } 1729 } catch (IOException e) { 1730 throw new ServiceException(e); 1731 } 1732 GetRegionLoadResponse.Builder builder = GetRegionLoadResponse.newBuilder(); 1733 builder.addAllRegionLoads(rLoads); 1734 return builder.build(); 1735 } 1736 1737 @Override 1738 @QosPriority(priority = HConstants.ADMIN_QOS) 1739 public ClearCompactionQueuesResponse clearCompactionQueues(RpcController controller, 1740 ClearCompactionQueuesRequest request) throws ServiceException { 1741 LOG.debug("Client=" + RpcServer.getRequestUserName().orElse(null) + "/" 1742 + RpcServer.getRemoteAddress().orElse(null) + " clear compactions queue"); 1743 ClearCompactionQueuesResponse.Builder respBuilder = ClearCompactionQueuesResponse.newBuilder(); 1744 requestCount.increment(); 1745 if (clearCompactionQueues.compareAndSet(false, true)) { 1746 final CompactSplit compactSplitThread = server.getCompactSplitThread(); 1747 try { 1748 checkOpen(); 1749 server.getRegionServerCoprocessorHost().preClearCompactionQueues(); 1750 for (String queueName : request.getQueueNameList()) { 1751 LOG.debug("clear " + queueName + " compaction queue"); 1752 switch (queueName) { 1753 case "long": 1754 compactSplitThread.clearLongCompactionsQueue(); 1755 break; 1756 case "short": 1757 compactSplitThread.clearShortCompactionsQueue(); 1758 break; 1759 default: 1760 LOG.warn("Unknown queue name " + queueName); 1761 throw new IOException("Unknown queue name " + queueName); 1762 } 1763 } 1764 server.getRegionServerCoprocessorHost().postClearCompactionQueues(); 1765 } catch (IOException ie) { 1766 throw new ServiceException(ie); 1767 } finally { 1768 clearCompactionQueues.set(false); 1769 } 1770 } else { 1771 LOG.warn("Clear compactions queue is executing by other admin."); 1772 } 1773 return respBuilder.build(); 1774 } 1775 1776 /** 1777 * Get some information of the region server. 1778 * @param controller the RPC controller 1779 * @param request the request 1780 */ 1781 @Override 1782 @QosPriority(priority = HConstants.ADMIN_QOS) 1783 public GetServerInfoResponse getServerInfo(final RpcController controller, 1784 final GetServerInfoRequest request) throws ServiceException { 1785 try { 1786 checkOpen(); 1787 } catch (IOException ie) { 1788 throw new ServiceException(ie); 1789 } 1790 requestCount.increment(); 1791 int infoPort = server.getInfoServer() != null ? server.getInfoServer().getPort() : -1; 1792 return ResponseConverter.buildGetServerInfoResponse(server.getServerName(), infoPort); 1793 } 1794 1795 @Override 1796 @QosPriority(priority = HConstants.ADMIN_QOS) 1797 public GetStoreFileResponse getStoreFile(final RpcController controller, 1798 final GetStoreFileRequest request) throws ServiceException { 1799 try { 1800 checkOpen(); 1801 HRegion region = getRegion(request.getRegion()); 1802 requestCount.increment(); 1803 Set<byte[]> columnFamilies; 1804 if (request.getFamilyCount() == 0) { 1805 columnFamilies = region.getTableDescriptor().getColumnFamilyNames(); 1806 } else { 1807 columnFamilies = new TreeSet<>(Bytes.BYTES_RAWCOMPARATOR); 1808 for (ByteString cf : request.getFamilyList()) { 1809 columnFamilies.add(cf.toByteArray()); 1810 } 1811 } 1812 int nCF = columnFamilies.size(); 1813 List<String> fileList = region.getStoreFileList(columnFamilies.toArray(new byte[nCF][])); 1814 GetStoreFileResponse.Builder builder = GetStoreFileResponse.newBuilder(); 1815 builder.addAllStoreFile(fileList); 1816 return builder.build(); 1817 } catch (IOException ie) { 1818 throw new ServiceException(ie); 1819 } 1820 } 1821 1822 private void throwOnWrongStartCode(OpenRegionRequest request) throws ServiceException { 1823 if (!request.hasServerStartCode()) { 1824 LOG.warn("OpenRegionRequest for {} does not have a start code", request.getOpenInfoList()); 1825 return; 1826 } 1827 throwOnWrongStartCode(request.getServerStartCode()); 1828 } 1829 1830 private void throwOnWrongStartCode(CloseRegionRequest request) throws ServiceException { 1831 if (!request.hasServerStartCode()) { 1832 LOG.warn("CloseRegionRequest for {} does not have a start code", request.getRegion()); 1833 return; 1834 } 1835 throwOnWrongStartCode(request.getServerStartCode()); 1836 } 1837 1838 private void throwOnWrongStartCode(long serverStartCode) throws ServiceException { 1839 // check that we are the same server that this RPC is intended for. 1840 if (server.getServerName().getStartcode() != serverStartCode) { 1841 throw new ServiceException(new DoNotRetryIOException( 1842 "This RPC was intended for a " + "different server with startCode: " + serverStartCode 1843 + ", this server is: " + server.getServerName())); 1844 } 1845 } 1846 1847 private void throwOnWrongStartCode(ExecuteProceduresRequest req) throws ServiceException { 1848 if (req.getOpenRegionCount() > 0) { 1849 for (OpenRegionRequest openReq : req.getOpenRegionList()) { 1850 throwOnWrongStartCode(openReq); 1851 } 1852 } 1853 if (req.getCloseRegionCount() > 0) { 1854 for (CloseRegionRequest closeReq : req.getCloseRegionList()) { 1855 throwOnWrongStartCode(closeReq); 1856 } 1857 } 1858 } 1859 1860 /** 1861 * Open asynchronously a region or a set of regions on the region server. The opening is 1862 * coordinated by ZooKeeper, and this method requires the znode to be created before being called. 1863 * As a consequence, this method should be called only from the master. 1864 * <p> 1865 * Different manages states for the region are: 1866 * </p> 1867 * <ul> 1868 * <li>region not opened: the region opening will start asynchronously.</li> 1869 * <li>a close is already in progress: this is considered as an error.</li> 1870 * <li>an open is already in progress: this new open request will be ignored. This is important 1871 * because the Master can do multiple requests if it crashes.</li> 1872 * <li>the region is already opened: this new open request will be ignored.</li> 1873 * </ul> 1874 * <p> 1875 * Bulk assign: If there are more than 1 region to open, it will be considered as a bulk assign. 1876 * For a single region opening, errors are sent through a ServiceException. For bulk assign, 1877 * errors are put in the response as FAILED_OPENING. 1878 * </p> 1879 * @param controller the RPC controller 1880 * @param request the request 1881 */ 1882 @Override 1883 @QosPriority(priority = HConstants.ADMIN_QOS) 1884 public OpenRegionResponse openRegion(final RpcController controller, 1885 final OpenRegionRequest request) throws ServiceException { 1886 requestCount.increment(); 1887 throwOnWrongStartCode(request); 1888 1889 OpenRegionResponse.Builder builder = OpenRegionResponse.newBuilder(); 1890 final int regionCount = request.getOpenInfoCount(); 1891 final Map<TableName, TableDescriptor> htds = new HashMap<>(regionCount); 1892 final boolean isBulkAssign = regionCount > 1; 1893 try { 1894 checkOpen(); 1895 } catch (IOException ie) { 1896 TableName tableName = null; 1897 if (regionCount == 1) { 1898 org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionInfo ri = 1899 request.getOpenInfo(0).getRegion(); 1900 if (ri != null) { 1901 tableName = ProtobufUtil.toTableName(ri.getTableName()); 1902 } 1903 } 1904 if (!TableName.META_TABLE_NAME.equals(tableName)) { 1905 throw new ServiceException(ie); 1906 } 1907 // We are assigning meta, wait a little for regionserver to finish initialization. 1908 // Default to quarter of RPC timeout 1909 int timeout = server.getConfiguration().getInt(HConstants.HBASE_RPC_TIMEOUT_KEY, 1910 HConstants.DEFAULT_HBASE_RPC_TIMEOUT) >> 2; 1911 long endTime = EnvironmentEdgeManager.currentTime() + timeout; 1912 synchronized (server.online) { 1913 try { 1914 while ( 1915 EnvironmentEdgeManager.currentTime() <= endTime && !server.isStopped() 1916 && !server.isOnline() 1917 ) { 1918 server.online.wait(server.getMsgInterval()); 1919 } 1920 checkOpen(); 1921 } catch (InterruptedException t) { 1922 Thread.currentThread().interrupt(); 1923 throw new ServiceException(t); 1924 } catch (IOException e) { 1925 throw new ServiceException(e); 1926 } 1927 } 1928 } 1929 1930 long masterSystemTime = request.hasMasterSystemTime() ? request.getMasterSystemTime() : -1; 1931 1932 for (RegionOpenInfo regionOpenInfo : request.getOpenInfoList()) { 1933 final RegionInfo region = ProtobufUtil.toRegionInfo(regionOpenInfo.getRegion()); 1934 TableDescriptor htd; 1935 try { 1936 String encodedName = region.getEncodedName(); 1937 byte[] encodedNameBytes = region.getEncodedNameAsBytes(); 1938 final HRegion onlineRegion = server.getRegion(encodedName); 1939 if (onlineRegion != null) { 1940 // The region is already online. This should not happen any more. 1941 String error = "Received OPEN for the region:" + region.getRegionNameAsString() 1942 + ", which is already online"; 1943 LOG.warn(error); 1944 // server.abort(error); 1945 // throw new IOException(error); 1946 builder.addOpeningState(RegionOpeningState.OPENED); 1947 continue; 1948 } 1949 LOG.info("Open " + region.getRegionNameAsString()); 1950 1951 final Boolean previous = 1952 server.getRegionsInTransitionInRS().putIfAbsent(encodedNameBytes, Boolean.TRUE); 1953 1954 if (Boolean.FALSE.equals(previous)) { 1955 if (server.getRegion(encodedName) != null) { 1956 // There is a close in progress. This should not happen any more. 1957 String error = "Received OPEN for the region:" + region.getRegionNameAsString() 1958 + ", which we are already trying to CLOSE"; 1959 server.abort(error); 1960 throw new IOException(error); 1961 } 1962 server.getRegionsInTransitionInRS().put(encodedNameBytes, Boolean.TRUE); 1963 } 1964 1965 if (Boolean.TRUE.equals(previous)) { 1966 // An open is in progress. This is supported, but let's log this. 1967 LOG.info("Receiving OPEN for the region:" + region.getRegionNameAsString() 1968 + ", which we are already trying to OPEN" 1969 + " - ignoring this new request for this region."); 1970 } 1971 1972 // We are opening this region. If it moves back and forth for whatever reason, we don't 1973 // want to keep returning the stale moved record while we are opening/if we close again. 1974 server.removeFromMovedRegions(region.getEncodedName()); 1975 1976 if (previous == null || !previous.booleanValue()) { 1977 htd = htds.get(region.getTable()); 1978 if (htd == null) { 1979 htd = server.getTableDescriptors().get(region.getTable()); 1980 htds.put(region.getTable(), htd); 1981 } 1982 if (htd == null) { 1983 throw new IOException("Missing table descriptor for " + region.getEncodedName()); 1984 } 1985 // If there is no action in progress, we can submit a specific handler. 1986 // Need to pass the expected version in the constructor. 1987 if (server.getExecutorService() == null) { 1988 LOG.info("No executor executorService; skipping open request"); 1989 } else { 1990 if (region.isMetaRegion()) { 1991 server.getExecutorService() 1992 .submit(new OpenMetaHandler(server, server, region, htd, masterSystemTime)); 1993 } else { 1994 if (regionOpenInfo.getFavoredNodesCount() > 0) { 1995 server.updateRegionFavoredNodesMapping(region.getEncodedName(), 1996 regionOpenInfo.getFavoredNodesList()); 1997 } 1998 if (htd.getPriority() >= HConstants.ADMIN_QOS || region.getTable().isSystemTable()) { 1999 server.getExecutorService().submit( 2000 new OpenPriorityRegionHandler(server, server, region, htd, masterSystemTime)); 2001 } else { 2002 server.getExecutorService() 2003 .submit(new OpenRegionHandler(server, server, region, htd, masterSystemTime)); 2004 } 2005 } 2006 } 2007 } 2008 2009 builder.addOpeningState(RegionOpeningState.OPENED); 2010 } catch (IOException ie) { 2011 LOG.warn("Failed opening region " + region.getRegionNameAsString(), ie); 2012 if (isBulkAssign) { 2013 builder.addOpeningState(RegionOpeningState.FAILED_OPENING); 2014 } else { 2015 throw new ServiceException(ie); 2016 } 2017 } 2018 } 2019 return builder.build(); 2020 } 2021 2022 /** 2023 * Warmup a region on this server. This method should only be called by Master. It synchronously 2024 * opens the region and closes the region bringing the most important pages in cache. 2025 */ 2026 @Override 2027 public WarmupRegionResponse warmupRegion(final RpcController controller, 2028 final WarmupRegionRequest request) throws ServiceException { 2029 final RegionInfo region = ProtobufUtil.toRegionInfo(request.getRegionInfo()); 2030 WarmupRegionResponse response = WarmupRegionResponse.getDefaultInstance(); 2031 try { 2032 checkOpen(); 2033 String encodedName = region.getEncodedName(); 2034 byte[] encodedNameBytes = region.getEncodedNameAsBytes(); 2035 final HRegion onlineRegion = server.getRegion(encodedName); 2036 if (onlineRegion != null) { 2037 LOG.info("{} is online; skipping warmup", region); 2038 return response; 2039 } 2040 TableDescriptor htd = server.getTableDescriptors().get(region.getTable()); 2041 if (server.getRegionsInTransitionInRS().containsKey(encodedNameBytes)) { 2042 LOG.info("{} is in transition; skipping warmup", region); 2043 return response; 2044 } 2045 LOG.info("Warmup {}", region.getRegionNameAsString()); 2046 HRegion.warmupHRegion(region, htd, server.getWAL(region), server.getConfiguration(), server, 2047 null); 2048 } catch (IOException ie) { 2049 LOG.error("Failed warmup of {}", region.getRegionNameAsString(), ie); 2050 throw new ServiceException(ie); 2051 } 2052 2053 return response; 2054 } 2055 2056 private CellScanner getAndReset(RpcController controller) { 2057 HBaseRpcController hrc = (HBaseRpcController) controller; 2058 CellScanner cells = hrc.cellScanner(); 2059 hrc.setCellScanner(null); 2060 return cells; 2061 } 2062 2063 /** 2064 * Replay the given changes when distributedLogReplay WAL edits from a failed RS. The guarantee is 2065 * that the given mutations will be durable on the receiving RS if this method returns without any 2066 * exception. 2067 * @param controller the RPC controller 2068 * @param request the request 2069 * @deprecated Since 3.0.0, will be removed in 4.0.0. Not used any more, put here only for 2070 * compatibility with old region replica implementation. Now we will use 2071 * {@code replicateToReplica} method instead. 2072 */ 2073 @Deprecated 2074 @Override 2075 @QosPriority(priority = HConstants.REPLAY_QOS) 2076 public ReplicateWALEntryResponse replay(final RpcController controller, 2077 final ReplicateWALEntryRequest request) throws ServiceException { 2078 long before = EnvironmentEdgeManager.currentTime(); 2079 CellScanner cells = getAndReset(controller); 2080 try { 2081 checkOpen(); 2082 List<WALEntry> entries = request.getEntryList(); 2083 if (entries == null || entries.isEmpty()) { 2084 // empty input 2085 return ReplicateWALEntryResponse.newBuilder().build(); 2086 } 2087 ByteString regionName = entries.get(0).getKey().getEncodedRegionName(); 2088 HRegion region = server.getRegionByEncodedName(regionName.toStringUtf8()); 2089 RegionCoprocessorHost coprocessorHost = 2090 ServerRegionReplicaUtil.isDefaultReplica(region.getRegionInfo()) 2091 ? region.getCoprocessorHost() 2092 : null; // do not invoke coprocessors if this is a secondary region replica 2093 2094 // Skip adding the edits to WAL if this is a secondary region replica 2095 boolean isPrimary = RegionReplicaUtil.isDefaultReplica(region.getRegionInfo()); 2096 Durability durability = isPrimary ? Durability.USE_DEFAULT : Durability.SKIP_WAL; 2097 2098 for (WALEntry entry : entries) { 2099 if (!regionName.equals(entry.getKey().getEncodedRegionName())) { 2100 throw new NotServingRegionException("Replay request contains entries from multiple " 2101 + "regions. First region:" + regionName.toStringUtf8() + " , other region:" 2102 + entry.getKey().getEncodedRegionName()); 2103 } 2104 if (server.nonceManager != null && isPrimary) { 2105 long nonceGroup = 2106 entry.getKey().hasNonceGroup() ? entry.getKey().getNonceGroup() : HConstants.NO_NONCE; 2107 long nonce = entry.getKey().hasNonce() ? entry.getKey().getNonce() : HConstants.NO_NONCE; 2108 server.nonceManager.reportOperationFromWal(nonceGroup, nonce, 2109 entry.getKey().getWriteTime()); 2110 } 2111 Pair<WALKey, WALEdit> walEntry = (coprocessorHost == null) ? null : new Pair<>(); 2112 List<MutationReplay> edits = 2113 WALSplitUtil.getMutationsFromWALEntry(entry, cells, walEntry, durability); 2114 if (edits != null && !edits.isEmpty()) { 2115 // HBASE-17924 2116 // sort to improve lock efficiency 2117 Collections.sort(edits, (v1, v2) -> Row.COMPARATOR.compare(v1.mutation, v2.mutation)); 2118 long replaySeqId = (entry.getKey().hasOrigSequenceNumber()) 2119 ? entry.getKey().getOrigSequenceNumber() 2120 : entry.getKey().getLogSequenceNumber(); 2121 OperationStatus[] result = doReplayBatchOp(region, edits, replaySeqId); 2122 // check if it's a partial success 2123 for (int i = 0; result != null && i < result.length; i++) { 2124 if (result[i] != OperationStatus.SUCCESS) { 2125 throw new IOException(result[i].getExceptionMsg()); 2126 } 2127 } 2128 } 2129 } 2130 2131 // sync wal at the end because ASYNC_WAL is used above 2132 WAL wal = region.getWAL(); 2133 if (wal != null) { 2134 wal.sync(); 2135 } 2136 return ReplicateWALEntryResponse.newBuilder().build(); 2137 } catch (IOException ie) { 2138 throw new ServiceException(ie); 2139 } finally { 2140 final MetricsRegionServer metricsRegionServer = server.getMetrics(); 2141 if (metricsRegionServer != null) { 2142 metricsRegionServer.updateReplay(EnvironmentEdgeManager.currentTime() - before); 2143 } 2144 } 2145 } 2146 2147 /** 2148 * Replay the given changes on a secondary replica 2149 */ 2150 @Override 2151 public ReplicateWALEntryResponse replicateToReplica(RpcController controller, 2152 ReplicateWALEntryRequest request) throws ServiceException { 2153 CellScanner cells = getAndReset(controller); 2154 try { 2155 checkOpen(); 2156 List<WALEntry> entries = request.getEntryList(); 2157 if (entries == null || entries.isEmpty()) { 2158 // empty input 2159 return ReplicateWALEntryResponse.newBuilder().build(); 2160 } 2161 ByteString regionName = entries.get(0).getKey().getEncodedRegionName(); 2162 HRegion region = server.getRegionByEncodedName(regionName.toStringUtf8()); 2163 if (RegionReplicaUtil.isDefaultReplica(region.getRegionInfo())) { 2164 throw new DoNotRetryIOException( 2165 "Should not replicate to primary replica " + region.getRegionInfo() + ", CODE BUG?"); 2166 } 2167 for (WALEntry entry : entries) { 2168 if (!regionName.equals(entry.getKey().getEncodedRegionName())) { 2169 throw new NotServingRegionException( 2170 "ReplicateToReplica request contains entries from multiple " + "regions. First region:" 2171 + regionName.toStringUtf8() + " , other region:" 2172 + entry.getKey().getEncodedRegionName()); 2173 } 2174 region.replayWALEntry(entry, cells); 2175 } 2176 return ReplicateWALEntryResponse.newBuilder().build(); 2177 } catch (IOException ie) { 2178 throw new ServiceException(ie); 2179 } 2180 } 2181 2182 private void checkShouldRejectReplicationRequest(List<WALEntry> entries) throws IOException { 2183 ReplicationSourceService replicationSource = server.getReplicationSourceService(); 2184 if (replicationSource == null || entries.isEmpty()) { 2185 return; 2186 } 2187 // We can ensure that all entries are for one peer, so only need to check one entry's 2188 // table name. if the table hit sync replication at peer side and the peer cluster 2189 // is (or is transiting to) state ACTIVE or DOWNGRADE_ACTIVE, we should reject to apply 2190 // those entries according to the design doc. 2191 TableName table = TableName.valueOf(entries.get(0).getKey().getTableName().toByteArray()); 2192 if ( 2193 replicationSource.getSyncReplicationPeerInfoProvider().checkState(table, 2194 RejectReplicationRequestStateChecker.get()) 2195 ) { 2196 throw new DoNotRetryIOException( 2197 "Reject to apply to sink cluster because sync replication state of sink cluster " 2198 + "is ACTIVE or DOWNGRADE_ACTIVE, table: " + table); 2199 } 2200 } 2201 2202 /** 2203 * Replicate WAL entries on the region server. 2204 * @param controller the RPC controller 2205 * @param request the request 2206 */ 2207 @Override 2208 @QosPriority(priority = HConstants.REPLICATION_QOS) 2209 public ReplicateWALEntryResponse replicateWALEntry(final RpcController controller, 2210 final ReplicateWALEntryRequest request) throws ServiceException { 2211 try { 2212 checkOpen(); 2213 if (server.getReplicationSinkService() != null) { 2214 requestCount.increment(); 2215 List<WALEntry> entries = request.getEntryList(); 2216 checkShouldRejectReplicationRequest(entries); 2217 CellScanner cellScanner = getAndReset(controller); 2218 server.getRegionServerCoprocessorHost().preReplicateLogEntries(); 2219 server.getReplicationSinkService().replicateLogEntries(entries, cellScanner, 2220 request.getReplicationClusterId(), request.getSourceBaseNamespaceDirPath(), 2221 request.getSourceHFileArchiveDirPath()); 2222 server.getRegionServerCoprocessorHost().postReplicateLogEntries(); 2223 return ReplicateWALEntryResponse.newBuilder().build(); 2224 } else { 2225 throw new ServiceException("Replication services are not initialized yet"); 2226 } 2227 } catch (IOException ie) { 2228 throw new ServiceException(ie); 2229 } 2230 } 2231 2232 /** 2233 * Roll the WAL writer of the region server. 2234 * @param controller the RPC controller 2235 * @param request the request 2236 */ 2237 @Override 2238 @QosPriority(priority = HConstants.ADMIN_QOS) 2239 public RollWALWriterResponse rollWALWriter(final RpcController controller, 2240 final RollWALWriterRequest request) throws ServiceException { 2241 try { 2242 checkOpen(); 2243 requestCount.increment(); 2244 server.getRegionServerCoprocessorHost().preRollWALWriterRequest(); 2245 server.getWalRoller().requestRollAll(); 2246 server.getRegionServerCoprocessorHost().postRollWALWriterRequest(); 2247 RollWALWriterResponse.Builder builder = RollWALWriterResponse.newBuilder(); 2248 return builder.build(); 2249 } catch (IOException ie) { 2250 throw new ServiceException(ie); 2251 } 2252 } 2253 2254 /** 2255 * Stop the region server. 2256 * @param controller the RPC controller 2257 * @param request the request 2258 */ 2259 @Override 2260 @QosPriority(priority = HConstants.ADMIN_QOS) 2261 public StopServerResponse stopServer(final RpcController controller, 2262 final StopServerRequest request) throws ServiceException { 2263 rpcPreCheck("stopServer"); 2264 requestCount.increment(); 2265 String reason = request.getReason(); 2266 server.stop(reason); 2267 return StopServerResponse.newBuilder().build(); 2268 } 2269 2270 @Override 2271 public UpdateFavoredNodesResponse updateFavoredNodes(RpcController controller, 2272 UpdateFavoredNodesRequest request) throws ServiceException { 2273 rpcPreCheck("updateFavoredNodes"); 2274 List<UpdateFavoredNodesRequest.RegionUpdateInfo> openInfoList = request.getUpdateInfoList(); 2275 UpdateFavoredNodesResponse.Builder respBuilder = UpdateFavoredNodesResponse.newBuilder(); 2276 for (UpdateFavoredNodesRequest.RegionUpdateInfo regionUpdateInfo : openInfoList) { 2277 RegionInfo hri = ProtobufUtil.toRegionInfo(regionUpdateInfo.getRegion()); 2278 if (regionUpdateInfo.getFavoredNodesCount() > 0) { 2279 server.updateRegionFavoredNodesMapping(hri.getEncodedName(), 2280 regionUpdateInfo.getFavoredNodesList()); 2281 } 2282 } 2283 respBuilder.setResponse(openInfoList.size()); 2284 return respBuilder.build(); 2285 } 2286 2287 /** 2288 * Atomically bulk load several HFiles into an open region 2289 * @return true if successful, false is failed but recoverably (no action) 2290 * @throws ServiceException if failed unrecoverably 2291 */ 2292 @Override 2293 public BulkLoadHFileResponse bulkLoadHFile(final RpcController controller, 2294 final BulkLoadHFileRequest request) throws ServiceException { 2295 long start = EnvironmentEdgeManager.currentTime(); 2296 List<String> clusterIds = new ArrayList<>(request.getClusterIdsList()); 2297 if (clusterIds.contains(this.server.getClusterId())) { 2298 return BulkLoadHFileResponse.newBuilder().setLoaded(true).build(); 2299 } else { 2300 clusterIds.add(this.server.getClusterId()); 2301 } 2302 try { 2303 checkOpen(); 2304 requestCount.increment(); 2305 HRegion region = getRegion(request.getRegion()); 2306 final boolean spaceQuotaEnabled = QuotaUtil.isQuotaEnabled(getConfiguration()); 2307 long sizeToBeLoaded = -1; 2308 2309 // Check to see if this bulk load would exceed the space quota for this table 2310 if (spaceQuotaEnabled) { 2311 ActivePolicyEnforcement activeSpaceQuotas = getSpaceQuotaManager().getActiveEnforcements(); 2312 SpaceViolationPolicyEnforcement enforcement = 2313 activeSpaceQuotas.getPolicyEnforcement(region); 2314 if (enforcement != null) { 2315 // Bulk loads must still be atomic. We must enact all or none. 2316 List<String> filePaths = new ArrayList<>(request.getFamilyPathCount()); 2317 for (FamilyPath familyPath : request.getFamilyPathList()) { 2318 filePaths.add(familyPath.getPath()); 2319 } 2320 // Check if the batch of files exceeds the current quota 2321 sizeToBeLoaded = enforcement.computeBulkLoadSize(getFileSystem(filePaths), filePaths); 2322 } 2323 } 2324 // secure bulk load 2325 Map<byte[], List<Path>> map = 2326 server.getSecureBulkLoadManager().secureBulkLoadHFiles(region, request, clusterIds); 2327 BulkLoadHFileResponse.Builder builder = BulkLoadHFileResponse.newBuilder(); 2328 builder.setLoaded(map != null); 2329 if (map != null) { 2330 // Treat any negative size as a flag to "ignore" updating the region size as that is 2331 // not possible to occur in real life (cannot bulk load a file with negative size) 2332 if (spaceQuotaEnabled && sizeToBeLoaded > 0) { 2333 if (LOG.isTraceEnabled()) { 2334 LOG.trace("Incrementing space use of " + region.getRegionInfo() + " by " 2335 + sizeToBeLoaded + " bytes"); 2336 } 2337 // Inform space quotas of the new files for this region 2338 getSpaceQuotaManager().getRegionSizeStore().incrementRegionSize(region.getRegionInfo(), 2339 sizeToBeLoaded); 2340 } 2341 } 2342 return builder.build(); 2343 } catch (IOException ie) { 2344 throw new ServiceException(ie); 2345 } finally { 2346 final MetricsRegionServer metricsRegionServer = server.getMetrics(); 2347 if (metricsRegionServer != null) { 2348 metricsRegionServer.updateBulkLoad(EnvironmentEdgeManager.currentTime() - start); 2349 } 2350 } 2351 } 2352 2353 @Override 2354 public PrepareBulkLoadResponse prepareBulkLoad(RpcController controller, 2355 PrepareBulkLoadRequest request) throws ServiceException { 2356 try { 2357 checkOpen(); 2358 requestCount.increment(); 2359 2360 HRegion region = getRegion(request.getRegion()); 2361 2362 String bulkToken = server.getSecureBulkLoadManager().prepareBulkLoad(region, request); 2363 PrepareBulkLoadResponse.Builder builder = PrepareBulkLoadResponse.newBuilder(); 2364 builder.setBulkToken(bulkToken); 2365 return builder.build(); 2366 } catch (IOException ie) { 2367 throw new ServiceException(ie); 2368 } 2369 } 2370 2371 @Override 2372 public CleanupBulkLoadResponse cleanupBulkLoad(RpcController controller, 2373 CleanupBulkLoadRequest request) throws ServiceException { 2374 try { 2375 checkOpen(); 2376 requestCount.increment(); 2377 2378 HRegion region = getRegion(request.getRegion()); 2379 2380 server.getSecureBulkLoadManager().cleanupBulkLoad(region, request); 2381 return CleanupBulkLoadResponse.newBuilder().build(); 2382 } catch (IOException ie) { 2383 throw new ServiceException(ie); 2384 } 2385 } 2386 2387 @Override 2388 public CoprocessorServiceResponse execService(final RpcController controller, 2389 final CoprocessorServiceRequest request) throws ServiceException { 2390 try { 2391 checkOpen(); 2392 requestCount.increment(); 2393 HRegion region = getRegion(request.getRegion()); 2394 Message result = execServiceOnRegion(region, request.getCall()); 2395 CoprocessorServiceResponse.Builder builder = CoprocessorServiceResponse.newBuilder(); 2396 builder.setRegion(RequestConverter.buildRegionSpecifier(RegionSpecifierType.REGION_NAME, 2397 region.getRegionInfo().getRegionName())); 2398 // TODO: COPIES!!!!!! 2399 builder.setValue(builder.getValueBuilder().setName(result.getClass().getName()).setValue( 2400 org.apache.hbase.thirdparty.com.google.protobuf.ByteString.copyFrom(result.toByteArray()))); 2401 return builder.build(); 2402 } catch (IOException ie) { 2403 throw new ServiceException(ie); 2404 } 2405 } 2406 2407 private FileSystem getFileSystem(List<String> filePaths) throws IOException { 2408 if (filePaths.isEmpty()) { 2409 // local hdfs 2410 return server.getFileSystem(); 2411 } 2412 // source hdfs 2413 return new Path(filePaths.get(0)).getFileSystem(server.getConfiguration()); 2414 } 2415 2416 private Message execServiceOnRegion(HRegion region, 2417 final ClientProtos.CoprocessorServiceCall serviceCall) throws IOException { 2418 // ignore the passed in controller (from the serialized call) 2419 ServerRpcController execController = new ServerRpcController(); 2420 return region.execService(execController, serviceCall); 2421 } 2422 2423 private boolean shouldRejectRequestsFromClient(HRegion region) { 2424 TableName table = region.getRegionInfo().getTable(); 2425 ReplicationSourceService service = server.getReplicationSourceService(); 2426 return service != null && service.getSyncReplicationPeerInfoProvider().checkState(table, 2427 RejectRequestsFromClientStateChecker.get()); 2428 } 2429 2430 private void rejectIfInStandByState(HRegion region) throws DoNotRetryIOException { 2431 if (shouldRejectRequestsFromClient(region)) { 2432 throw new DoNotRetryIOException( 2433 region.getRegionInfo().getRegionNameAsString() + " is in STANDBY state."); 2434 } 2435 } 2436 2437 /** 2438 * Get data from a table. 2439 * @param controller the RPC controller 2440 * @param request the get request 2441 */ 2442 @Override 2443 public GetResponse get(final RpcController controller, final GetRequest request) 2444 throws ServiceException { 2445 long before = EnvironmentEdgeManager.currentTime(); 2446 OperationQuota quota = null; 2447 HRegion region = null; 2448 RpcCallContext context = RpcServer.getCurrentCall().orElse(null); 2449 try { 2450 checkOpen(); 2451 requestCount.increment(); 2452 rpcGetRequestCount.increment(); 2453 region = getRegion(request.getRegion()); 2454 rejectIfInStandByState(region); 2455 2456 GetResponse.Builder builder = GetResponse.newBuilder(); 2457 ClientProtos.Get get = request.getGet(); 2458 // An asynchbase client, https://github.com/OpenTSDB/asynchbase, starts by trying to do 2459 // a get closest before. Throwing the UnknownProtocolException signals it that it needs 2460 // to switch and do hbase2 protocol (HBase servers do not tell clients what versions 2461 // they are; its a problem for non-native clients like asynchbase. HBASE-20225. 2462 if (get.hasClosestRowBefore() && get.getClosestRowBefore()) { 2463 throw new UnknownProtocolException("Is this a pre-hbase-1.0.0 or asynchbase client? " 2464 + "Client is invoking getClosestRowBefore removed in hbase-2.0.0 replaced by " 2465 + "reverse Scan."); 2466 } 2467 Boolean existence = null; 2468 Result r = null; 2469 quota = getRpcQuotaManager().checkBatchQuota(region, OperationQuota.OperationType.GET); 2470 2471 Get clientGet = ProtobufUtil.toGet(get); 2472 if (get.getExistenceOnly() && region.getCoprocessorHost() != null) { 2473 existence = region.getCoprocessorHost().preExists(clientGet); 2474 } 2475 if (existence == null) { 2476 if (context != null) { 2477 r = get(clientGet, (region), null, context); 2478 } else { 2479 // for test purpose 2480 r = region.get(clientGet); 2481 } 2482 if (get.getExistenceOnly()) { 2483 boolean exists = r.getExists(); 2484 if (region.getCoprocessorHost() != null) { 2485 exists = region.getCoprocessorHost().postExists(clientGet, exists); 2486 } 2487 existence = exists; 2488 } 2489 } 2490 if (existence != null) { 2491 ClientProtos.Result pbr = 2492 ProtobufUtil.toResult(existence, region.getRegionInfo().getReplicaId() != 0); 2493 builder.setResult(pbr); 2494 } else if (r != null) { 2495 ClientProtos.Result pbr; 2496 if ( 2497 isClientCellBlockSupport(context) && controller instanceof HBaseRpcController 2498 && VersionInfoUtil.hasMinimumVersion(context.getClientVersionInfo(), 1, 3) 2499 ) { 2500 pbr = ProtobufUtil.toResultNoData(r); 2501 ((HBaseRpcController) controller) 2502 .setCellScanner(CellUtil.createCellScanner(r.rawCells())); 2503 addSize(context, r); 2504 } else { 2505 pbr = ProtobufUtil.toResult(r); 2506 } 2507 builder.setResult(pbr); 2508 } 2509 // r.cells is null when an table.exists(get) call 2510 if (r != null && r.rawCells() != null) { 2511 quota.addGetResult(r); 2512 } 2513 return builder.build(); 2514 } catch (IOException ie) { 2515 throw new ServiceException(ie); 2516 } finally { 2517 final MetricsRegionServer metricsRegionServer = server.getMetrics(); 2518 if (metricsRegionServer != null && region != null) { 2519 long blockBytesScanned = context != null ? context.getBlockBytesScanned() : 0; 2520 metricsRegionServer.updateGet(region, EnvironmentEdgeManager.currentTime() - before, 2521 blockBytesScanned); 2522 } 2523 if (quota != null) { 2524 quota.close(); 2525 } 2526 } 2527 } 2528 2529 private Result get(Get get, HRegion region, RegionScannersCloseCallBack closeCallBack, 2530 RpcCallContext context) throws IOException { 2531 region.prepareGet(get); 2532 boolean stale = region.getRegionInfo().getReplicaId() != 0; 2533 2534 // This method is almost the same as HRegion#get. 2535 List<Cell> results = new ArrayList<>(); 2536 // pre-get CP hook 2537 if (region.getCoprocessorHost() != null) { 2538 if (region.getCoprocessorHost().preGet(get, results)) { 2539 region.metricsUpdateForGet(); 2540 return Result.create(results, get.isCheckExistenceOnly() ? !results.isEmpty() : null, 2541 stale); 2542 } 2543 } 2544 Scan scan = new Scan(get); 2545 if (scan.getLoadColumnFamiliesOnDemandValue() == null) { 2546 scan.setLoadColumnFamiliesOnDemand(region.isLoadingCfsOnDemandDefault()); 2547 } 2548 RegionScannerImpl scanner = null; 2549 try { 2550 scanner = region.getScanner(scan); 2551 scanner.next(results); 2552 } finally { 2553 if (scanner != null) { 2554 if (closeCallBack == null) { 2555 // If there is a context then the scanner can be added to the current 2556 // RpcCallContext. The rpc callback will take care of closing the 2557 // scanner, for eg in case 2558 // of get() 2559 context.setCallBack(scanner); 2560 } else { 2561 // The call is from multi() where the results from the get() are 2562 // aggregated and then send out to the 2563 // rpc. The rpccall back will close all such scanners created as part 2564 // of multi(). 2565 closeCallBack.addScanner(scanner); 2566 } 2567 } 2568 } 2569 2570 // post-get CP hook 2571 if (region.getCoprocessorHost() != null) { 2572 region.getCoprocessorHost().postGet(get, results); 2573 } 2574 region.metricsUpdateForGet(); 2575 2576 return Result.create(results, get.isCheckExistenceOnly() ? !results.isEmpty() : null, stale); 2577 } 2578 2579 private void checkBatchSizeAndLogLargeSize(MultiRequest request) throws ServiceException { 2580 int sum = 0; 2581 String firstRegionName = null; 2582 for (RegionAction regionAction : request.getRegionActionList()) { 2583 if (sum == 0) { 2584 firstRegionName = Bytes.toStringBinary(regionAction.getRegion().getValue().toByteArray()); 2585 } 2586 sum += regionAction.getActionCount(); 2587 } 2588 if (sum > rowSizeWarnThreshold) { 2589 LOG.warn("Large batch operation detected (greater than " + rowSizeWarnThreshold 2590 + ") (HBASE-18023)." + " Requested Number of Rows: " + sum + " Client: " 2591 + RpcServer.getRequestUserName().orElse(null) + "/" 2592 + RpcServer.getRemoteAddress().orElse(null) + " first region in multi=" + firstRegionName); 2593 if (rejectRowsWithSizeOverThreshold) { 2594 throw new ServiceException( 2595 "Rejecting large batch operation for current batch with firstRegionName: " 2596 + firstRegionName + " , Requested Number of Rows: " + sum + " , Size Threshold: " 2597 + rowSizeWarnThreshold); 2598 } 2599 } 2600 } 2601 2602 private void failRegionAction(MultiResponse.Builder responseBuilder, 2603 RegionActionResult.Builder regionActionResultBuilder, RegionAction regionAction, 2604 CellScanner cellScanner, Throwable error) { 2605 rpcServer.getMetrics().exception(error); 2606 regionActionResultBuilder.setException(ResponseConverter.buildException(error)); 2607 responseBuilder.addRegionActionResult(regionActionResultBuilder.build()); 2608 // All Mutations in this RegionAction not executed as we can not see the Region online here 2609 // in this RS. Will be retried from Client. Skipping all the Cells in CellScanner 2610 // corresponding to these Mutations. 2611 if (cellScanner != null) { 2612 skipCellsForMutations(regionAction.getActionList(), cellScanner); 2613 } 2614 } 2615 2616 private boolean isReplicationRequest(Action action) { 2617 // replication request can only be put or delete. 2618 if (!action.hasMutation()) { 2619 return false; 2620 } 2621 MutationProto mutation = action.getMutation(); 2622 MutationType type = mutation.getMutateType(); 2623 if (type != MutationType.PUT && type != MutationType.DELETE) { 2624 return false; 2625 } 2626 // replication will set a special attribute so we can make use of it to decide whether a request 2627 // is for replication. 2628 return mutation.getAttributeList().stream().map(p -> p.getName()) 2629 .filter(n -> n.equals(ReplicationUtils.REPLICATION_ATTR_NAME)).findAny().isPresent(); 2630 } 2631 2632 /** 2633 * Execute multiple actions on a table: get, mutate, and/or execCoprocessor 2634 * @param rpcc the RPC controller 2635 * @param request the multi request 2636 */ 2637 @Override 2638 public MultiResponse multi(final RpcController rpcc, final MultiRequest request) 2639 throws ServiceException { 2640 try { 2641 checkOpen(); 2642 } catch (IOException ie) { 2643 throw new ServiceException(ie); 2644 } 2645 2646 checkBatchSizeAndLogLargeSize(request); 2647 2648 // rpc controller is how we bring in data via the back door; it is unprotobuf'ed data. 2649 // It is also the conduit via which we pass back data. 2650 HBaseRpcController controller = (HBaseRpcController) rpcc; 2651 CellScanner cellScanner = controller != null ? controller.cellScanner() : null; 2652 if (controller != null) { 2653 controller.setCellScanner(null); 2654 } 2655 2656 long nonceGroup = request.hasNonceGroup() ? request.getNonceGroup() : HConstants.NO_NONCE; 2657 2658 MultiResponse.Builder responseBuilder = MultiResponse.newBuilder(); 2659 RegionActionResult.Builder regionActionResultBuilder = RegionActionResult.newBuilder(); 2660 this.rpcMultiRequestCount.increment(); 2661 this.requestCount.increment(); 2662 ActivePolicyEnforcement spaceQuotaEnforcement = getSpaceQuotaManager().getActiveEnforcements(); 2663 2664 // We no longer use MultiRequest#condition. Instead, we use RegionAction#condition. The 2665 // following logic is for backward compatibility as old clients still use 2666 // MultiRequest#condition in case of checkAndMutate with RowMutations. 2667 if (request.hasCondition()) { 2668 if (request.getRegionActionList().isEmpty()) { 2669 // If the region action list is empty, do nothing. 2670 responseBuilder.setProcessed(true); 2671 return responseBuilder.build(); 2672 } 2673 2674 RegionAction regionAction = request.getRegionAction(0); 2675 2676 // When request.hasCondition() is true, regionAction.getAtomic() should be always true. So 2677 // we can assume regionAction.getAtomic() is true here. 2678 assert regionAction.getAtomic(); 2679 2680 OperationQuota quota; 2681 HRegion region; 2682 RegionSpecifier regionSpecifier = regionAction.getRegion(); 2683 2684 try { 2685 region = getRegion(regionSpecifier); 2686 quota = getRpcQuotaManager().checkBatchQuota(region, regionAction.getActionList(), 2687 regionAction.hasCondition()); 2688 } catch (IOException e) { 2689 failRegionAction(responseBuilder, regionActionResultBuilder, regionAction, cellScanner, e); 2690 return responseBuilder.build(); 2691 } 2692 2693 try { 2694 boolean rejectIfFromClient = shouldRejectRequestsFromClient(region); 2695 // We only allow replication in standby state and it will not set the atomic flag. 2696 if (rejectIfFromClient) { 2697 failRegionAction(responseBuilder, regionActionResultBuilder, regionAction, cellScanner, 2698 new DoNotRetryIOException( 2699 region.getRegionInfo().getRegionNameAsString() + " is in STANDBY state")); 2700 return responseBuilder.build(); 2701 } 2702 2703 try { 2704 CheckAndMutateResult result = checkAndMutate(region, regionAction.getActionList(), 2705 cellScanner, request.getCondition(), nonceGroup, spaceQuotaEnforcement); 2706 responseBuilder.setProcessed(result.isSuccess()); 2707 ClientProtos.ResultOrException.Builder resultOrExceptionOrBuilder = 2708 ClientProtos.ResultOrException.newBuilder(); 2709 for (int i = 0; i < regionAction.getActionCount(); i++) { 2710 // To unify the response format with doNonAtomicRegionMutation and read through 2711 // client's AsyncProcess we have to add an empty result instance per operation 2712 resultOrExceptionOrBuilder.clear(); 2713 resultOrExceptionOrBuilder.setIndex(i); 2714 regionActionResultBuilder.addResultOrException(resultOrExceptionOrBuilder.build()); 2715 } 2716 } catch (IOException e) { 2717 rpcServer.getMetrics().exception(e); 2718 // As it's an atomic operation with a condition, we may expect it's a global failure. 2719 regionActionResultBuilder.setException(ResponseConverter.buildException(e)); 2720 } 2721 } finally { 2722 quota.close(); 2723 } 2724 2725 responseBuilder.addRegionActionResult(regionActionResultBuilder.build()); 2726 ClientProtos.RegionLoadStats regionLoadStats = region.getLoadStatistics(); 2727 if (regionLoadStats != null) { 2728 responseBuilder.setRegionStatistics(MultiRegionLoadStats.newBuilder() 2729 .addRegion(regionSpecifier).addStat(regionLoadStats).build()); 2730 } 2731 return responseBuilder.build(); 2732 } 2733 2734 // this will contain all the cells that we need to return. It's created later, if needed. 2735 List<CellScannable> cellsToReturn = null; 2736 RegionScannersCloseCallBack closeCallBack = null; 2737 RpcCallContext context = RpcServer.getCurrentCall().orElse(null); 2738 Map<RegionSpecifier, ClientProtos.RegionLoadStats> regionStats = 2739 new HashMap<>(request.getRegionActionCount()); 2740 2741 for (RegionAction regionAction : request.getRegionActionList()) { 2742 OperationQuota quota; 2743 HRegion region; 2744 RegionSpecifier regionSpecifier = regionAction.getRegion(); 2745 regionActionResultBuilder.clear(); 2746 2747 try { 2748 region = getRegion(regionSpecifier); 2749 quota = getRpcQuotaManager().checkBatchQuota(region, regionAction.getActionList(), 2750 regionAction.hasCondition()); 2751 } catch (IOException e) { 2752 failRegionAction(responseBuilder, regionActionResultBuilder, regionAction, cellScanner, e); 2753 continue; // For this region it's a failure. 2754 } 2755 2756 try { 2757 boolean rejectIfFromClient = shouldRejectRequestsFromClient(region); 2758 2759 if (regionAction.hasCondition()) { 2760 // We only allow replication in standby state and it will not set the atomic flag. 2761 if (rejectIfFromClient) { 2762 failRegionAction(responseBuilder, regionActionResultBuilder, regionAction, cellScanner, 2763 new DoNotRetryIOException( 2764 region.getRegionInfo().getRegionNameAsString() + " is in STANDBY state")); 2765 continue; 2766 } 2767 2768 try { 2769 ClientProtos.ResultOrException.Builder resultOrExceptionOrBuilder = 2770 ClientProtos.ResultOrException.newBuilder(); 2771 if (regionAction.getActionCount() == 1) { 2772 CheckAndMutateResult result = 2773 checkAndMutate(region, quota, regionAction.getAction(0).getMutation(), cellScanner, 2774 regionAction.getCondition(), nonceGroup, spaceQuotaEnforcement, context); 2775 regionActionResultBuilder.setProcessed(result.isSuccess()); 2776 resultOrExceptionOrBuilder.setIndex(0); 2777 if (result.getResult() != null) { 2778 resultOrExceptionOrBuilder.setResult(ProtobufUtil.toResult(result.getResult())); 2779 } 2780 regionActionResultBuilder.addResultOrException(resultOrExceptionOrBuilder.build()); 2781 } else { 2782 CheckAndMutateResult result = checkAndMutate(region, regionAction.getActionList(), 2783 cellScanner, regionAction.getCondition(), nonceGroup, spaceQuotaEnforcement); 2784 regionActionResultBuilder.setProcessed(result.isSuccess()); 2785 for (int i = 0; i < regionAction.getActionCount(); i++) { 2786 if (i == 0 && result.getResult() != null) { 2787 // Set the result of the Increment/Append operations to the first element of the 2788 // ResultOrException list 2789 resultOrExceptionOrBuilder.setIndex(i); 2790 regionActionResultBuilder.addResultOrException(resultOrExceptionOrBuilder 2791 .setResult(ProtobufUtil.toResult(result.getResult())).build()); 2792 continue; 2793 } 2794 // To unify the response format with doNonAtomicRegionMutation and read through 2795 // client's AsyncProcess we have to add an empty result instance per operation 2796 resultOrExceptionOrBuilder.clear(); 2797 resultOrExceptionOrBuilder.setIndex(i); 2798 regionActionResultBuilder.addResultOrException(resultOrExceptionOrBuilder.build()); 2799 } 2800 } 2801 } catch (IOException e) { 2802 rpcServer.getMetrics().exception(e); 2803 // As it's an atomic operation with a condition, we may expect it's a global failure. 2804 regionActionResultBuilder.setException(ResponseConverter.buildException(e)); 2805 } 2806 } else if (regionAction.hasAtomic() && regionAction.getAtomic()) { 2807 // We only allow replication in standby state and it will not set the atomic flag. 2808 if (rejectIfFromClient) { 2809 failRegionAction(responseBuilder, regionActionResultBuilder, regionAction, cellScanner, 2810 new DoNotRetryIOException( 2811 region.getRegionInfo().getRegionNameAsString() + " is in STANDBY state")); 2812 continue; 2813 } 2814 try { 2815 doAtomicBatchOp(regionActionResultBuilder, region, quota, regionAction.getActionList(), 2816 cellScanner, nonceGroup, spaceQuotaEnforcement); 2817 regionActionResultBuilder.setProcessed(true); 2818 // We no longer use MultiResponse#processed. Instead, we use 2819 // RegionActionResult#processed. This is for backward compatibility for old clients. 2820 responseBuilder.setProcessed(true); 2821 } catch (IOException e) { 2822 rpcServer.getMetrics().exception(e); 2823 // As it's atomic, we may expect it's a global failure. 2824 regionActionResultBuilder.setException(ResponseConverter.buildException(e)); 2825 } 2826 } else { 2827 if ( 2828 rejectIfFromClient && regionAction.getActionCount() > 0 2829 && !isReplicationRequest(regionAction.getAction(0)) 2830 ) { 2831 // fail if it is not a replication request 2832 failRegionAction(responseBuilder, regionActionResultBuilder, regionAction, cellScanner, 2833 new DoNotRetryIOException( 2834 region.getRegionInfo().getRegionNameAsString() + " is in STANDBY state")); 2835 continue; 2836 } 2837 // doNonAtomicRegionMutation manages the exception internally 2838 if (context != null && closeCallBack == null) { 2839 // An RpcCallBack that creates a list of scanners that needs to perform callBack 2840 // operation on completion of multiGets. 2841 // Set this only once 2842 closeCallBack = new RegionScannersCloseCallBack(); 2843 context.setCallBack(closeCallBack); 2844 } 2845 cellsToReturn = doNonAtomicRegionMutation(region, quota, regionAction, cellScanner, 2846 regionActionResultBuilder, cellsToReturn, nonceGroup, closeCallBack, context, 2847 spaceQuotaEnforcement); 2848 } 2849 } finally { 2850 quota.close(); 2851 } 2852 2853 responseBuilder.addRegionActionResult(regionActionResultBuilder.build()); 2854 ClientProtos.RegionLoadStats regionLoadStats = region.getLoadStatistics(); 2855 if (regionLoadStats != null) { 2856 regionStats.put(regionSpecifier, regionLoadStats); 2857 } 2858 } 2859 // Load the controller with the Cells to return. 2860 if (cellsToReturn != null && !cellsToReturn.isEmpty() && controller != null) { 2861 controller.setCellScanner(CellUtil.createCellScanner(cellsToReturn)); 2862 } 2863 2864 MultiRegionLoadStats.Builder builder = MultiRegionLoadStats.newBuilder(); 2865 for (Entry<RegionSpecifier, ClientProtos.RegionLoadStats> stat : regionStats.entrySet()) { 2866 builder.addRegion(stat.getKey()); 2867 builder.addStat(stat.getValue()); 2868 } 2869 responseBuilder.setRegionStatistics(builder); 2870 return responseBuilder.build(); 2871 } 2872 2873 private void skipCellsForMutations(List<Action> actions, CellScanner cellScanner) { 2874 if (cellScanner == null) { 2875 return; 2876 } 2877 for (Action action : actions) { 2878 skipCellsForMutation(action, cellScanner); 2879 } 2880 } 2881 2882 private void skipCellsForMutation(Action action, CellScanner cellScanner) { 2883 if (cellScanner == null) { 2884 return; 2885 } 2886 try { 2887 if (action.hasMutation()) { 2888 MutationProto m = action.getMutation(); 2889 if (m.hasAssociatedCellCount()) { 2890 for (int i = 0; i < m.getAssociatedCellCount(); i++) { 2891 cellScanner.advance(); 2892 } 2893 } 2894 } 2895 } catch (IOException e) { 2896 // No need to handle these Individual Muatation level issue. Any way this entire RegionAction 2897 // marked as failed as we could not see the Region here. At client side the top level 2898 // RegionAction exception will be considered first. 2899 LOG.error("Error while skipping Cells in CellScanner for invalid Region Mutations", e); 2900 } 2901 } 2902 2903 /** 2904 * Mutate data in a table. 2905 * @param rpcc the RPC controller 2906 * @param request the mutate request 2907 */ 2908 @Override 2909 public MutateResponse mutate(final RpcController rpcc, final MutateRequest request) 2910 throws ServiceException { 2911 // rpc controller is how we bring in data via the back door; it is unprotobuf'ed data. 2912 // It is also the conduit via which we pass back data. 2913 HBaseRpcController controller = (HBaseRpcController) rpcc; 2914 CellScanner cellScanner = controller != null ? controller.cellScanner() : null; 2915 OperationQuota quota = null; 2916 RpcCallContext context = RpcServer.getCurrentCall().orElse(null); 2917 // Clear scanner so we are not holding on to reference across call. 2918 if (controller != null) { 2919 controller.setCellScanner(null); 2920 } 2921 try { 2922 checkOpen(); 2923 requestCount.increment(); 2924 rpcMutateRequestCount.increment(); 2925 HRegion region = getRegion(request.getRegion()); 2926 rejectIfInStandByState(region); 2927 MutateResponse.Builder builder = MutateResponse.newBuilder(); 2928 MutationProto mutation = request.getMutation(); 2929 if (!region.getRegionInfo().isMetaRegion()) { 2930 server.getMemStoreFlusher().reclaimMemStoreMemory(); 2931 } 2932 long nonceGroup = request.hasNonceGroup() ? request.getNonceGroup() : HConstants.NO_NONCE; 2933 OperationQuota.OperationType operationType = QuotaUtil.getQuotaOperationType(request); 2934 quota = getRpcQuotaManager().checkBatchQuota(region, operationType); 2935 ActivePolicyEnforcement spaceQuotaEnforcement = 2936 getSpaceQuotaManager().getActiveEnforcements(); 2937 2938 if (request.hasCondition()) { 2939 CheckAndMutateResult result = checkAndMutate(region, quota, mutation, cellScanner, 2940 request.getCondition(), nonceGroup, spaceQuotaEnforcement, context); 2941 builder.setProcessed(result.isSuccess()); 2942 boolean clientCellBlockSupported = isClientCellBlockSupport(context); 2943 addResult(builder, result.getResult(), controller, clientCellBlockSupported); 2944 if (clientCellBlockSupported) { 2945 addSize(context, result.getResult()); 2946 } 2947 } else { 2948 Result r = null; 2949 Boolean processed = null; 2950 MutationType type = mutation.getMutateType(); 2951 switch (type) { 2952 case APPEND: 2953 // TODO: this doesn't actually check anything. 2954 r = append(region, quota, mutation, cellScanner, nonceGroup, spaceQuotaEnforcement, 2955 context); 2956 break; 2957 case INCREMENT: 2958 // TODO: this doesn't actually check anything. 2959 r = increment(region, quota, mutation, cellScanner, nonceGroup, spaceQuotaEnforcement, 2960 context); 2961 break; 2962 case PUT: 2963 put(region, quota, mutation, cellScanner, spaceQuotaEnforcement); 2964 processed = Boolean.TRUE; 2965 break; 2966 case DELETE: 2967 delete(region, quota, mutation, cellScanner, spaceQuotaEnforcement); 2968 processed = Boolean.TRUE; 2969 break; 2970 default: 2971 throw new DoNotRetryIOException("Unsupported mutate type: " + type.name()); 2972 } 2973 if (processed != null) { 2974 builder.setProcessed(processed); 2975 } 2976 boolean clientCellBlockSupported = isClientCellBlockSupport(context); 2977 addResult(builder, r, controller, clientCellBlockSupported); 2978 if (clientCellBlockSupported) { 2979 addSize(context, r); 2980 } 2981 } 2982 return builder.build(); 2983 } catch (IOException ie) { 2984 server.checkFileSystem(); 2985 throw new ServiceException(ie); 2986 } finally { 2987 if (quota != null) { 2988 quota.close(); 2989 } 2990 } 2991 } 2992 2993 private void put(HRegion region, OperationQuota quota, MutationProto mutation, 2994 CellScanner cellScanner, ActivePolicyEnforcement spaceQuota) throws IOException { 2995 long before = EnvironmentEdgeManager.currentTime(); 2996 Put put = ProtobufUtil.toPut(mutation, cellScanner); 2997 checkCellSizeLimit(region, put); 2998 spaceQuota.getPolicyEnforcement(region).check(put); 2999 quota.addMutation(put); 3000 region.put(put); 3001 3002 MetricsRegionServer metricsRegionServer = server.getMetrics(); 3003 if (metricsRegionServer != null) { 3004 long after = EnvironmentEdgeManager.currentTime(); 3005 metricsRegionServer.updatePut(region, after - before); 3006 } 3007 } 3008 3009 private void delete(HRegion region, OperationQuota quota, MutationProto mutation, 3010 CellScanner cellScanner, ActivePolicyEnforcement spaceQuota) throws IOException { 3011 long before = EnvironmentEdgeManager.currentTime(); 3012 Delete delete = ProtobufUtil.toDelete(mutation, cellScanner); 3013 checkCellSizeLimit(region, delete); 3014 spaceQuota.getPolicyEnforcement(region).check(delete); 3015 quota.addMutation(delete); 3016 region.delete(delete); 3017 3018 MetricsRegionServer metricsRegionServer = server.getMetrics(); 3019 if (metricsRegionServer != null) { 3020 long after = EnvironmentEdgeManager.currentTime(); 3021 metricsRegionServer.updateDelete(region, after - before); 3022 } 3023 } 3024 3025 private CheckAndMutateResult checkAndMutate(HRegion region, OperationQuota quota, 3026 MutationProto mutation, CellScanner cellScanner, Condition condition, long nonceGroup, 3027 ActivePolicyEnforcement spaceQuota, RpcCallContext context) throws IOException { 3028 long before = EnvironmentEdgeManager.currentTime(); 3029 long blockBytesScannedBefore = context != null ? context.getBlockBytesScanned() : 0; 3030 CheckAndMutate checkAndMutate = ProtobufUtil.toCheckAndMutate(condition, mutation, cellScanner); 3031 long nonce = mutation.hasNonce() ? mutation.getNonce() : HConstants.NO_NONCE; 3032 checkCellSizeLimit(region, (Mutation) checkAndMutate.getAction()); 3033 spaceQuota.getPolicyEnforcement(region).check((Mutation) checkAndMutate.getAction()); 3034 quota.addMutation((Mutation) checkAndMutate.getAction()); 3035 3036 CheckAndMutateResult result = null; 3037 if (region.getCoprocessorHost() != null) { 3038 result = region.getCoprocessorHost().preCheckAndMutate(checkAndMutate); 3039 } 3040 if (result == null) { 3041 result = region.checkAndMutate(checkAndMutate, nonceGroup, nonce); 3042 if (region.getCoprocessorHost() != null) { 3043 result = region.getCoprocessorHost().postCheckAndMutate(checkAndMutate, result); 3044 } 3045 } 3046 MetricsRegionServer metricsRegionServer = server.getMetrics(); 3047 if (metricsRegionServer != null) { 3048 long after = EnvironmentEdgeManager.currentTime(); 3049 long blockBytesScanned = 3050 context != null ? context.getBlockBytesScanned() - blockBytesScannedBefore : 0; 3051 metricsRegionServer.updateCheckAndMutate(region, after - before, blockBytesScanned); 3052 3053 MutationType type = mutation.getMutateType(); 3054 switch (type) { 3055 case PUT: 3056 metricsRegionServer.updateCheckAndPut(region, after - before); 3057 break; 3058 case DELETE: 3059 metricsRegionServer.updateCheckAndDelete(region, after - before); 3060 break; 3061 default: 3062 break; 3063 } 3064 } 3065 return result; 3066 } 3067 3068 // This is used to keep compatible with the old client implementation. Consider remove it if we 3069 // decide to drop the support of the client that still sends close request to a region scanner 3070 // which has already been exhausted. 3071 @Deprecated 3072 private static final IOException SCANNER_ALREADY_CLOSED = new IOException() { 3073 3074 private static final long serialVersionUID = -4305297078988180130L; 3075 3076 @Override 3077 public synchronized Throwable fillInStackTrace() { 3078 return this; 3079 } 3080 }; 3081 3082 private RegionScannerHolder getRegionScanner(ScanRequest request) throws IOException { 3083 String scannerName = toScannerName(request.getScannerId()); 3084 RegionScannerHolder rsh = this.scanners.get(scannerName); 3085 if (rsh == null) { 3086 // just ignore the next or close request if scanner does not exists. 3087 Long lastCallSeq = closedScanners.getIfPresent(scannerName); 3088 if (lastCallSeq != null) { 3089 // Check the sequence number to catch if the last call was incorrectly retried. 3090 // The only allowed scenario is when the scanner is exhausted and one more scan 3091 // request arrives - in this case returning 0 rows is correct. 3092 if (request.hasNextCallSeq() && request.getNextCallSeq() != lastCallSeq + 1) { 3093 throw new OutOfOrderScannerNextException("Expected nextCallSeq for closed request: " 3094 + (lastCallSeq + 1) + " But the nextCallSeq got from client: " 3095 + request.getNextCallSeq() + "; request=" + TextFormat.shortDebugString(request)); 3096 } else { 3097 throw SCANNER_ALREADY_CLOSED; 3098 } 3099 } else { 3100 LOG.warn("Client tried to access missing scanner " + scannerName); 3101 throw new UnknownScannerException( 3102 "Unknown scanner '" + scannerName + "'. This can happen due to any of the following " 3103 + "reasons: a) Scanner id given is wrong, b) Scanner lease expired because of " 3104 + "long wait between consecutive client checkins, c) Server may be closing down, " 3105 + "d) RegionServer restart during upgrade.\nIf the issue is due to reason (b), a " 3106 + "possible fix would be increasing the value of" 3107 + "'hbase.client.scanner.timeout.period' configuration."); 3108 } 3109 } 3110 rejectIfInStandByState(rsh.r); 3111 RegionInfo hri = rsh.s.getRegionInfo(); 3112 // Yes, should be the same instance 3113 if (server.getOnlineRegion(hri.getRegionName()) != rsh.r) { 3114 String msg = "Region has changed on the scanner " + scannerName + ": regionName=" 3115 + hri.getRegionNameAsString() + ", scannerRegionName=" + rsh.r; 3116 LOG.warn(msg + ", closing..."); 3117 scanners.remove(scannerName); 3118 try { 3119 rsh.s.close(); 3120 } catch (IOException e) { 3121 LOG.warn("Getting exception closing " + scannerName, e); 3122 } finally { 3123 try { 3124 server.getLeaseManager().cancelLease(scannerName); 3125 } catch (LeaseException e) { 3126 LOG.warn("Getting exception closing " + scannerName, e); 3127 } 3128 } 3129 throw new NotServingRegionException(msg); 3130 } 3131 return rsh; 3132 } 3133 3134 /** 3135 * @return Pair with scannerName key to use with this new Scanner and its RegionScannerHolder 3136 * value. 3137 */ 3138 private Pair<String, RegionScannerHolder> newRegionScanner(ScanRequest request, 3139 ScanResponse.Builder builder) throws IOException { 3140 HRegion region = getRegion(request.getRegion()); 3141 rejectIfInStandByState(region); 3142 ClientProtos.Scan protoScan = request.getScan(); 3143 boolean isLoadingCfsOnDemandSet = protoScan.hasLoadColumnFamiliesOnDemand(); 3144 Scan scan = ProtobufUtil.toScan(protoScan); 3145 // if the request doesn't set this, get the default region setting. 3146 if (!isLoadingCfsOnDemandSet) { 3147 scan.setLoadColumnFamiliesOnDemand(region.isLoadingCfsOnDemandDefault()); 3148 } 3149 3150 if (!scan.hasFamilies()) { 3151 // Adding all families to scanner 3152 for (byte[] family : region.getTableDescriptor().getColumnFamilyNames()) { 3153 scan.addFamily(family); 3154 } 3155 } 3156 if (region.getCoprocessorHost() != null) { 3157 // preScannerOpen is not allowed to return a RegionScanner. Only post hook can create a 3158 // wrapper for the core created RegionScanner 3159 region.getCoprocessorHost().preScannerOpen(scan); 3160 } 3161 RegionScannerImpl coreScanner = region.getScanner(scan); 3162 Shipper shipper = coreScanner; 3163 RegionScanner scanner = coreScanner; 3164 try { 3165 if (region.getCoprocessorHost() != null) { 3166 scanner = region.getCoprocessorHost().postScannerOpen(scan, scanner); 3167 } 3168 } catch (Exception e) { 3169 // Although region coprocessor is for advanced users and they should take care of the 3170 // implementation to not damage the HBase system, closing the scanner on exception here does 3171 // not have any bad side effect, so let's do it 3172 scanner.close(); 3173 throw e; 3174 } 3175 long scannerId = scannerIdGenerator.generateNewScannerId(); 3176 builder.setScannerId(scannerId); 3177 builder.setMvccReadPoint(scanner.getMvccReadPoint()); 3178 builder.setTtl(scannerLeaseTimeoutPeriod); 3179 String scannerName = toScannerName(scannerId); 3180 3181 boolean fullRegionScan = 3182 !region.getRegionInfo().getTable().isSystemTable() && isFullRegionScan(scan, region); 3183 3184 return new Pair<String, RegionScannerHolder>(scannerName, 3185 addScanner(scannerName, scanner, shipper, region, scan.isNeedCursorResult(), fullRegionScan)); 3186 } 3187 3188 /** 3189 * The returned String is used as key doing look up of outstanding Scanners in this Servers' 3190 * this.scanners, the Map of outstanding scanners and their current state. 3191 * @param scannerId A scanner long id. 3192 * @return The long id as a String. 3193 */ 3194 private static String toScannerName(long scannerId) { 3195 return Long.toString(scannerId); 3196 } 3197 3198 private void checkScanNextCallSeq(ScanRequest request, RegionScannerHolder rsh) 3199 throws OutOfOrderScannerNextException { 3200 // if nextCallSeq does not match throw Exception straight away. This needs to be 3201 // performed even before checking of Lease. 3202 // See HBASE-5974 3203 if (request.hasNextCallSeq()) { 3204 long callSeq = request.getNextCallSeq(); 3205 if (!rsh.incNextCallSeq(callSeq)) { 3206 throw new OutOfOrderScannerNextException( 3207 "Expected nextCallSeq: " + rsh.getNextCallSeq() + " But the nextCallSeq got from client: " 3208 + request.getNextCallSeq() + "; request=" + TextFormat.shortDebugString(request)); 3209 } 3210 } 3211 } 3212 3213 private void addScannerLeaseBack(LeaseManager.Lease lease) { 3214 try { 3215 server.getLeaseManager().addLease(lease); 3216 } catch (LeaseStillHeldException e) { 3217 // should not happen as the scanner id is unique. 3218 throw new AssertionError(e); 3219 } 3220 } 3221 3222 // visible for testing only 3223 long getTimeLimit(RpcCall rpcCall, HBaseRpcController controller, 3224 boolean allowHeartbeatMessages) { 3225 // Set the time limit to be half of the more restrictive timeout value (one of the 3226 // timeout values must be positive). In the event that both values are positive, the 3227 // more restrictive of the two is used to calculate the limit. 3228 if (allowHeartbeatMessages) { 3229 long now = EnvironmentEdgeManager.currentTime(); 3230 long remainingTimeout = getRemainingRpcTimeout(rpcCall, controller, now); 3231 if (scannerLeaseTimeoutPeriod > 0 || remainingTimeout > 0) { 3232 long timeLimitDelta; 3233 if (scannerLeaseTimeoutPeriod > 0 && remainingTimeout > 0) { 3234 timeLimitDelta = Math.min(scannerLeaseTimeoutPeriod, remainingTimeout); 3235 } else { 3236 timeLimitDelta = 3237 scannerLeaseTimeoutPeriod > 0 ? scannerLeaseTimeoutPeriod : remainingTimeout; 3238 } 3239 3240 // Use half of whichever timeout value was more restrictive... But don't allow 3241 // the time limit to be less than the allowable minimum (could cause an 3242 // immediate timeout before scanning any data). 3243 timeLimitDelta = Math.max(timeLimitDelta / 2, minimumScanTimeLimitDelta); 3244 return now + timeLimitDelta; 3245 } 3246 } 3247 // Default value of timeLimit is negative to indicate no timeLimit should be 3248 // enforced. 3249 return -1L; 3250 } 3251 3252 private long getRemainingRpcTimeout(RpcCall call, HBaseRpcController controller, long now) { 3253 long timeout; 3254 if (controller != null && controller.getCallTimeout() > 0) { 3255 timeout = controller.getCallTimeout(); 3256 } else if (rpcTimeout > 0) { 3257 timeout = rpcTimeout; 3258 } else { 3259 return -1; 3260 } 3261 if (call != null) { 3262 timeout -= (now - call.getReceiveTime()); 3263 } 3264 // getTimeLimit ignores values <= 0, but timeout may now be negative if queue time was high. 3265 // return minimum value here in that case so we count this in calculating the final delta. 3266 return Math.max(minimumScanTimeLimitDelta, timeout); 3267 } 3268 3269 private void checkLimitOfRows(int numOfCompleteRows, int limitOfRows, boolean moreRows, 3270 ScannerContext scannerContext, ScanResponse.Builder builder) { 3271 if (numOfCompleteRows >= limitOfRows) { 3272 if (LOG.isTraceEnabled()) { 3273 LOG.trace("Done scanning, limit of rows reached, moreRows: " + moreRows 3274 + " scannerContext: " + scannerContext); 3275 } 3276 builder.setMoreResults(false); 3277 } 3278 } 3279 3280 // return whether we have more results in region. 3281 private void scan(HBaseRpcController controller, ScanRequest request, RegionScannerHolder rsh, 3282 long maxQuotaResultSize, int maxResults, int limitOfRows, List<Result> results, 3283 ScanResponse.Builder builder, RpcCall rpcCall) throws IOException { 3284 HRegion region = rsh.r; 3285 RegionScanner scanner = rsh.s; 3286 long maxResultSize; 3287 if (scanner.getMaxResultSize() > 0) { 3288 maxResultSize = Math.min(scanner.getMaxResultSize(), maxQuotaResultSize); 3289 } else { 3290 maxResultSize = maxQuotaResultSize; 3291 } 3292 // This is cells inside a row. Default size is 10 so if many versions or many cfs, 3293 // then we'll resize. Resizings show in profiler. Set it higher than 10. For now 3294 // arbitrary 32. TODO: keep record of general size of results being returned. 3295 ArrayList<Cell> values = new ArrayList<>(32); 3296 region.startRegionOperation(Operation.SCAN); 3297 long before = EnvironmentEdgeManager.currentTime(); 3298 // Used to check if we've matched the row limit set on the Scan 3299 int numOfCompleteRows = 0; 3300 // Count of times we call nextRaw; can be > numOfCompleteRows. 3301 int numOfNextRawCalls = 0; 3302 try { 3303 int numOfResults = 0; 3304 synchronized (scanner) { 3305 boolean stale = (region.getRegionInfo().getReplicaId() != 0); 3306 boolean clientHandlesPartials = 3307 request.hasClientHandlesPartials() && request.getClientHandlesPartials(); 3308 boolean clientHandlesHeartbeats = 3309 request.hasClientHandlesHeartbeats() && request.getClientHandlesHeartbeats(); 3310 3311 // On the server side we must ensure that the correct ordering of partial results is 3312 // returned to the client to allow them to properly reconstruct the partial results. 3313 // If the coprocessor host is adding to the result list, we cannot guarantee the 3314 // correct ordering of partial results and so we prevent partial results from being 3315 // formed. 3316 boolean serverGuaranteesOrderOfPartials = results.isEmpty(); 3317 boolean allowPartialResults = clientHandlesPartials && serverGuaranteesOrderOfPartials; 3318 boolean moreRows = false; 3319 3320 // Heartbeat messages occur when the processing of the ScanRequest is exceeds a 3321 // certain time threshold on the server. When the time threshold is exceeded, the 3322 // server stops the scan and sends back whatever Results it has accumulated within 3323 // that time period (may be empty). Since heartbeat messages have the potential to 3324 // create partial Results (in the event that the timeout occurs in the middle of a 3325 // row), we must only generate heartbeat messages when the client can handle both 3326 // heartbeats AND partials 3327 boolean allowHeartbeatMessages = clientHandlesHeartbeats && allowPartialResults; 3328 3329 long timeLimit = getTimeLimit(rpcCall, controller, allowHeartbeatMessages); 3330 3331 final LimitScope sizeScope = 3332 allowPartialResults ? LimitScope.BETWEEN_CELLS : LimitScope.BETWEEN_ROWS; 3333 final LimitScope timeScope = 3334 allowHeartbeatMessages ? LimitScope.BETWEEN_CELLS : LimitScope.BETWEEN_ROWS; 3335 3336 boolean trackMetrics = request.hasTrackScanMetrics() && request.getTrackScanMetrics(); 3337 3338 // Configure with limits for this RPC. Set keep progress true since size progress 3339 // towards size limit should be kept between calls to nextRaw 3340 ScannerContext.Builder contextBuilder = ScannerContext.newBuilder(true); 3341 // maxResultSize - either we can reach this much size for all cells(being read) data or sum 3342 // of heap size occupied by cells(being read). Cell data means its key and value parts. 3343 // maxQuotaResultSize - max results just from server side configuration and quotas, without 3344 // user's specified max. We use this for evaluating limits based on blocks (not cells). 3345 // We may have accumulated some results in coprocessor preScannerNext call. Subtract any 3346 // cell or block size from maximum here so we adhere to total limits of request. 3347 // Note: we track block size in StoreScanner. If the CP hook got cells from hbase, it will 3348 // have accumulated block bytes. If not, this will be 0 for block size. 3349 long maxCellSize = maxResultSize; 3350 long maxBlockSize = maxQuotaResultSize; 3351 if (rpcCall != null) { 3352 maxBlockSize -= rpcCall.getBlockBytesScanned(); 3353 maxCellSize -= rpcCall.getResponseCellSize(); 3354 } 3355 3356 contextBuilder.setSizeLimit(sizeScope, maxCellSize, maxCellSize, maxBlockSize); 3357 contextBuilder.setBatchLimit(scanner.getBatch()); 3358 contextBuilder.setTimeLimit(timeScope, timeLimit); 3359 contextBuilder.setTrackMetrics(trackMetrics); 3360 ScannerContext scannerContext = contextBuilder.build(); 3361 boolean limitReached = false; 3362 while (numOfResults < maxResults) { 3363 // Reset the batch progress to 0 before every call to RegionScanner#nextRaw. The 3364 // batch limit is a limit on the number of cells per Result. Thus, if progress is 3365 // being tracked (i.e. scannerContext.keepProgress() is true) then we need to 3366 // reset the batch progress between nextRaw invocations since we don't want the 3367 // batch progress from previous calls to affect future calls 3368 scannerContext.setBatchProgress(0); 3369 assert values.isEmpty(); 3370 3371 // Collect values to be returned here 3372 moreRows = scanner.nextRaw(values, scannerContext); 3373 if (rpcCall == null) { 3374 // When there is no RpcCallContext,copy EC to heap, then the scanner would close, 3375 // This can be an EXPENSIVE call. It may make an extra copy from offheap to onheap 3376 // buffers.See more details in HBASE-26036. 3377 CellUtil.cloneIfNecessary(values); 3378 } 3379 numOfNextRawCalls++; 3380 3381 if (!values.isEmpty()) { 3382 if (limitOfRows > 0) { 3383 // First we need to check if the last result is partial and we have a row change. If 3384 // so then we need to increase the numOfCompleteRows. 3385 if (results.isEmpty()) { 3386 if ( 3387 rsh.rowOfLastPartialResult != null 3388 && !CellUtil.matchingRows(values.get(0), rsh.rowOfLastPartialResult) 3389 ) { 3390 numOfCompleteRows++; 3391 checkLimitOfRows(numOfCompleteRows, limitOfRows, moreRows, scannerContext, 3392 builder); 3393 } 3394 } else { 3395 Result lastResult = results.get(results.size() - 1); 3396 if ( 3397 lastResult.mayHaveMoreCellsInRow() 3398 && !CellUtil.matchingRows(values.get(0), lastResult.getRow()) 3399 ) { 3400 numOfCompleteRows++; 3401 checkLimitOfRows(numOfCompleteRows, limitOfRows, moreRows, scannerContext, 3402 builder); 3403 } 3404 } 3405 if (builder.hasMoreResults() && !builder.getMoreResults()) { 3406 break; 3407 } 3408 } 3409 boolean mayHaveMoreCellsInRow = scannerContext.mayHaveMoreCellsInRow(); 3410 Result r = Result.create(values, null, stale, mayHaveMoreCellsInRow); 3411 results.add(r); 3412 numOfResults++; 3413 if (!mayHaveMoreCellsInRow && limitOfRows > 0) { 3414 numOfCompleteRows++; 3415 checkLimitOfRows(numOfCompleteRows, limitOfRows, moreRows, scannerContext, builder); 3416 if (builder.hasMoreResults() && !builder.getMoreResults()) { 3417 break; 3418 } 3419 } 3420 } else if (!moreRows && !results.isEmpty()) { 3421 // No more cells for the scan here, we need to ensure that the mayHaveMoreCellsInRow of 3422 // last result is false. Otherwise it's possible that: the first nextRaw returned 3423 // because BATCH_LIMIT_REACHED (BTW it happen to exhaust all cells of the scan),so the 3424 // last result's mayHaveMoreCellsInRow will be true. while the following nextRaw will 3425 // return with moreRows=false, which means moreResultsInRegion would be false, it will 3426 // be a contradictory state (HBASE-21206). 3427 int lastIdx = results.size() - 1; 3428 Result r = results.get(lastIdx); 3429 if (r.mayHaveMoreCellsInRow()) { 3430 results.set(lastIdx, Result.create(r.rawCells(), r.getExists(), r.isStale(), false)); 3431 } 3432 } 3433 boolean sizeLimitReached = scannerContext.checkSizeLimit(LimitScope.BETWEEN_ROWS); 3434 boolean timeLimitReached = scannerContext.checkTimeLimit(LimitScope.BETWEEN_ROWS); 3435 boolean resultsLimitReached = numOfResults >= maxResults; 3436 limitReached = sizeLimitReached || timeLimitReached || resultsLimitReached; 3437 3438 if (limitReached || !moreRows) { 3439 // With block size limit, we may exceed size limit without collecting any results. 3440 // In this case we want to send heartbeat and/or cursor. We don't want to send heartbeat 3441 // or cursor if results were collected, for example for cell size or heap size limits. 3442 boolean sizeLimitReachedWithoutResults = sizeLimitReached && results.isEmpty(); 3443 // We only want to mark a ScanResponse as a heartbeat message in the event that 3444 // there are more values to be read server side. If there aren't more values, 3445 // marking it as a heartbeat is wasteful because the client will need to issue 3446 // another ScanRequest only to realize that they already have all the values 3447 if (moreRows && (timeLimitReached || sizeLimitReachedWithoutResults)) { 3448 // Heartbeat messages occur when the time limit has been reached, or size limit has 3449 // been reached before collecting any results. This can happen for heavily filtered 3450 // scans which scan over too many blocks. 3451 builder.setHeartbeatMessage(true); 3452 if (rsh.needCursor) { 3453 Cell cursorCell = scannerContext.getLastPeekedCell(); 3454 if (cursorCell != null) { 3455 builder.setCursor(ProtobufUtil.toCursor(cursorCell)); 3456 } 3457 } 3458 } 3459 break; 3460 } 3461 values.clear(); 3462 } 3463 if (rpcCall != null) { 3464 rpcCall.incrementResponseCellSize(scannerContext.getHeapSizeProgress()); 3465 } 3466 builder.setMoreResultsInRegion(moreRows); 3467 // Check to see if the client requested that we track metrics server side. If the 3468 // client requested metrics, retrieve the metrics from the scanner context. 3469 if (trackMetrics) { 3470 // rather than increment yet another counter in StoreScanner, just set the value here 3471 // from block size progress before writing into the response 3472 scannerContext.getMetrics().countOfBlockBytesScanned 3473 .set(scannerContext.getBlockSizeProgress()); 3474 if (rpcCall != null) { 3475 scannerContext.getMetrics().fsReadTime.set(rpcCall.getFsReadTime()); 3476 } 3477 Map<String, Long> metrics = scannerContext.getMetrics().getMetricsMap(); 3478 ScanMetrics.Builder metricBuilder = ScanMetrics.newBuilder(); 3479 NameInt64Pair.Builder pairBuilder = NameInt64Pair.newBuilder(); 3480 3481 for (Entry<String, Long> entry : metrics.entrySet()) { 3482 pairBuilder.setName(entry.getKey()); 3483 pairBuilder.setValue(entry.getValue()); 3484 metricBuilder.addMetrics(pairBuilder.build()); 3485 } 3486 3487 builder.setScanMetrics(metricBuilder.build()); 3488 } 3489 } 3490 } finally { 3491 region.closeRegionOperation(); 3492 // Update serverside metrics, even on error. 3493 long end = EnvironmentEdgeManager.currentTime(); 3494 3495 long responseCellSize = 0; 3496 long blockBytesScanned = 0; 3497 if (rpcCall != null) { 3498 responseCellSize = rpcCall.getResponseCellSize(); 3499 blockBytesScanned = rpcCall.getBlockBytesScanned(); 3500 rsh.updateBlockBytesScanned(blockBytesScanned); 3501 } 3502 region.getMetrics().updateScan(); 3503 final MetricsRegionServer metricsRegionServer = server.getMetrics(); 3504 if (metricsRegionServer != null) { 3505 metricsRegionServer.updateScan(region, end - before, responseCellSize, blockBytesScanned); 3506 metricsRegionServer.updateReadQueryMeter(region, numOfNextRawCalls); 3507 } 3508 } 3509 // coprocessor postNext hook 3510 if (region.getCoprocessorHost() != null) { 3511 region.getCoprocessorHost().postScannerNext(scanner, results, maxResults, true); 3512 } 3513 } 3514 3515 /** 3516 * Scan data in a table. 3517 * @param controller the RPC controller 3518 * @param request the scan request 3519 */ 3520 @Override 3521 public ScanResponse scan(final RpcController controller, final ScanRequest request) 3522 throws ServiceException { 3523 if (controller != null && !(controller instanceof HBaseRpcController)) { 3524 throw new UnsupportedOperationException( 3525 "We only do " + "HBaseRpcControllers! FIX IF A PROBLEM: " + controller); 3526 } 3527 if (!request.hasScannerId() && !request.hasScan()) { 3528 throw new ServiceException( 3529 new DoNotRetryIOException("Missing required input: scannerId or scan")); 3530 } 3531 try { 3532 checkOpen(); 3533 } catch (IOException e) { 3534 if (request.hasScannerId()) { 3535 String scannerName = toScannerName(request.getScannerId()); 3536 if (LOG.isDebugEnabled()) { 3537 LOG.debug( 3538 "Server shutting down and client tried to access missing scanner " + scannerName); 3539 } 3540 final LeaseManager leaseManager = server.getLeaseManager(); 3541 if (leaseManager != null) { 3542 try { 3543 leaseManager.cancelLease(scannerName); 3544 } catch (LeaseException le) { 3545 // No problem, ignore 3546 if (LOG.isTraceEnabled()) { 3547 LOG.trace("Un-able to cancel lease of scanner. It could already be closed."); 3548 } 3549 } 3550 } 3551 } 3552 throw new ServiceException(e); 3553 } 3554 requestCount.increment(); 3555 rpcScanRequestCount.increment(); 3556 RegionScannerHolder rsh; 3557 ScanResponse.Builder builder = ScanResponse.newBuilder(); 3558 String scannerName; 3559 try { 3560 if (request.hasScannerId()) { 3561 // The downstream projects such as AsyncHBase in OpenTSDB need this value. See HBASE-18000 3562 // for more details. 3563 long scannerId = request.getScannerId(); 3564 builder.setScannerId(scannerId); 3565 scannerName = toScannerName(scannerId); 3566 rsh = getRegionScanner(request); 3567 } else { 3568 Pair<String, RegionScannerHolder> scannerNameAndRSH = newRegionScanner(request, builder); 3569 scannerName = scannerNameAndRSH.getFirst(); 3570 rsh = scannerNameAndRSH.getSecond(); 3571 } 3572 } catch (IOException e) { 3573 if (e == SCANNER_ALREADY_CLOSED) { 3574 // Now we will close scanner automatically if there are no more results for this region but 3575 // the old client will still send a close request to us. Just ignore it and return. 3576 return builder.build(); 3577 } 3578 throw new ServiceException(e); 3579 } 3580 if (rsh.fullRegionScan) { 3581 rpcFullScanRequestCount.increment(); 3582 } 3583 HRegion region = rsh.r; 3584 LeaseManager.Lease lease; 3585 try { 3586 // Remove lease while its being processed in server; protects against case 3587 // where processing of request takes > lease expiration time. or null if none found. 3588 lease = server.getLeaseManager().removeLease(scannerName); 3589 } catch (LeaseException e) { 3590 throw new ServiceException(e); 3591 } 3592 if (request.hasRenew() && request.getRenew()) { 3593 // add back and return 3594 addScannerLeaseBack(lease); 3595 try { 3596 checkScanNextCallSeq(request, rsh); 3597 } catch (OutOfOrderScannerNextException e) { 3598 throw new ServiceException(e); 3599 } 3600 return builder.build(); 3601 } 3602 OperationQuota quota; 3603 try { 3604 quota = getRpcQuotaManager().checkScanQuota(region, request, maxScannerResultSize, 3605 rsh.getMaxBlockBytesScanned(), rsh.getPrevBlockBytesScannedDifference()); 3606 } catch (IOException e) { 3607 addScannerLeaseBack(lease); 3608 throw new ServiceException(e); 3609 } 3610 try { 3611 checkScanNextCallSeq(request, rsh); 3612 } catch (OutOfOrderScannerNextException e) { 3613 addScannerLeaseBack(lease); 3614 throw new ServiceException(e); 3615 } 3616 // Now we have increased the next call sequence. If we give client an error, the retry will 3617 // never success. So we'd better close the scanner and return a DoNotRetryIOException to client 3618 // and then client will try to open a new scanner. 3619 boolean closeScanner = request.hasCloseScanner() ? request.getCloseScanner() : false; 3620 int rows; // this is scan.getCaching 3621 if (request.hasNumberOfRows()) { 3622 rows = request.getNumberOfRows(); 3623 } else { 3624 rows = closeScanner ? 0 : 1; 3625 } 3626 RpcCall rpcCall = RpcServer.getCurrentCall().orElse(null); 3627 // now let's do the real scan. 3628 long maxQuotaResultSize = Math.min(maxScannerResultSize, quota.getMaxResultSize()); 3629 RegionScanner scanner = rsh.s; 3630 // this is the limit of rows for this scan, if we the number of rows reach this value, we will 3631 // close the scanner. 3632 int limitOfRows; 3633 if (request.hasLimitOfRows()) { 3634 limitOfRows = request.getLimitOfRows(); 3635 } else { 3636 limitOfRows = -1; 3637 } 3638 boolean scannerClosed = false; 3639 try { 3640 List<Result> results = new ArrayList<>(Math.min(rows, 512)); 3641 if (rows > 0) { 3642 boolean done = false; 3643 // Call coprocessor. Get region info from scanner. 3644 if (region.getCoprocessorHost() != null) { 3645 Boolean bypass = region.getCoprocessorHost().preScannerNext(scanner, results, rows); 3646 if (!results.isEmpty()) { 3647 for (Result r : results) { 3648 // add cell size from CP results so we can track response size and update limits 3649 // when calling scan below if !done. We'll also have tracked block size if the CP 3650 // got results from hbase, since StoreScanner tracks that for all calls automatically. 3651 addSize(rpcCall, r); 3652 } 3653 } 3654 if (bypass != null && bypass.booleanValue()) { 3655 done = true; 3656 } 3657 } 3658 if (!done) { 3659 scan((HBaseRpcController) controller, request, rsh, maxQuotaResultSize, rows, limitOfRows, 3660 results, builder, rpcCall); 3661 } else { 3662 builder.setMoreResultsInRegion(!results.isEmpty()); 3663 } 3664 } else { 3665 // This is a open scanner call with numberOfRow = 0, so set more results in region to true. 3666 builder.setMoreResultsInRegion(true); 3667 } 3668 3669 quota.addScanResult(results); 3670 addResults(builder, results, (HBaseRpcController) controller, 3671 RegionReplicaUtil.isDefaultReplica(region.getRegionInfo()), 3672 isClientCellBlockSupport(rpcCall)); 3673 if (scanner.isFilterDone() && results.isEmpty()) { 3674 // If the scanner's filter - if any - is done with the scan 3675 // only set moreResults to false if the results is empty. This is used to keep compatible 3676 // with the old scan implementation where we just ignore the returned results if moreResults 3677 // is false. Can remove the isEmpty check after we get rid of the old implementation. 3678 builder.setMoreResults(false); 3679 } 3680 // Later we may close the scanner depending on this flag so here we need to make sure that we 3681 // have already set this flag. 3682 assert builder.hasMoreResultsInRegion(); 3683 // we only set moreResults to false in the above code, so set it to true if we haven't set it 3684 // yet. 3685 if (!builder.hasMoreResults()) { 3686 builder.setMoreResults(true); 3687 } 3688 if (builder.getMoreResults() && builder.getMoreResultsInRegion() && !results.isEmpty()) { 3689 // Record the last cell of the last result if it is a partial result 3690 // We need this to calculate the complete rows we have returned to client as the 3691 // mayHaveMoreCellsInRow is true does not mean that there will be extra cells for the 3692 // current row. We may filter out all the remaining cells for the current row and just 3693 // return the cells of the nextRow when calling RegionScanner.nextRaw. So here we need to 3694 // check for row change. 3695 Result lastResult = results.get(results.size() - 1); 3696 if (lastResult.mayHaveMoreCellsInRow()) { 3697 rsh.rowOfLastPartialResult = lastResult.getRow(); 3698 } else { 3699 rsh.rowOfLastPartialResult = null; 3700 } 3701 } 3702 if (!builder.getMoreResults() || !builder.getMoreResultsInRegion() || closeScanner) { 3703 scannerClosed = true; 3704 closeScanner(region, scanner, scannerName, rpcCall, false); 3705 } 3706 3707 // There's no point returning to a timed out client. Throwing ensures scanner is closed 3708 if (rpcCall != null && EnvironmentEdgeManager.currentTime() > rpcCall.getDeadline()) { 3709 throw new TimeoutIOException("Client deadline exceeded, cannot return results"); 3710 } 3711 3712 return builder.build(); 3713 } catch (IOException e) { 3714 try { 3715 // scanner is closed here 3716 scannerClosed = true; 3717 // The scanner state might be left in a dirty state, so we will tell the Client to 3718 // fail this RPC and close the scanner while opening up another one from the start of 3719 // row that the client has last seen. 3720 closeScanner(region, scanner, scannerName, rpcCall, true); 3721 3722 // If it is a DoNotRetryIOException already, throw as it is. Unfortunately, DNRIOE is 3723 // used in two different semantics. 3724 // (1) The first is to close the client scanner and bubble up the exception all the way 3725 // to the application. This is preferred when the exception is really un-recoverable 3726 // (like CorruptHFileException, etc). Plain DoNotRetryIOException also falls into this 3727 // bucket usually. 3728 // (2) Second semantics is to close the current region scanner only, but continue the 3729 // client scanner by overriding the exception. This is usually UnknownScannerException, 3730 // OutOfOrderScannerNextException, etc where the region scanner has to be closed, but the 3731 // application-level ClientScanner has to continue without bubbling up the exception to 3732 // the client. See ClientScanner code to see how it deals with these special exceptions. 3733 if (e instanceof DoNotRetryIOException) { 3734 throw e; 3735 } 3736 3737 // If it is a FileNotFoundException, wrap as a 3738 // DoNotRetryIOException. This can avoid the retry in ClientScanner. 3739 if (e instanceof FileNotFoundException) { 3740 throw new DoNotRetryIOException(e); 3741 } 3742 3743 // We closed the scanner already. Instead of throwing the IOException, and client 3744 // retrying with the same scannerId only to get USE on the next RPC, we directly throw 3745 // a special exception to save an RPC. 3746 if (VersionInfoUtil.hasMinimumVersion(rpcCall.getClientVersionInfo(), 1, 4)) { 3747 // 1.4.0+ clients know how to handle 3748 throw new ScannerResetException("Scanner is closed on the server-side", e); 3749 } else { 3750 // older clients do not know about SRE. Just throw USE, which they will handle 3751 throw new UnknownScannerException("Throwing UnknownScannerException to reset the client" 3752 + " scanner state for clients older than 1.3.", e); 3753 } 3754 } catch (IOException ioe) { 3755 throw new ServiceException(ioe); 3756 } 3757 } finally { 3758 if (!scannerClosed) { 3759 // Adding resets expiration time on lease. 3760 // the closeCallBack will be set in closeScanner so here we only care about shippedCallback 3761 if (rpcCall != null) { 3762 rpcCall.setCallBack(rsh.shippedCallback); 3763 } else { 3764 // If context is null,here we call rsh.shippedCallback directly to reuse the logic in 3765 // rsh.shippedCallback to release the internal resources in rsh,and lease is also added 3766 // back to regionserver's LeaseManager in rsh.shippedCallback. 3767 runShippedCallback(rsh); 3768 } 3769 } 3770 quota.close(); 3771 } 3772 } 3773 3774 private void runShippedCallback(RegionScannerHolder rsh) throws ServiceException { 3775 assert rsh.shippedCallback != null; 3776 try { 3777 rsh.shippedCallback.run(); 3778 } catch (IOException ioe) { 3779 throw new ServiceException(ioe); 3780 } 3781 } 3782 3783 private void closeScanner(HRegion region, RegionScanner scanner, String scannerName, 3784 RpcCallContext context, boolean isError) throws IOException { 3785 if (region.getCoprocessorHost() != null) { 3786 if (region.getCoprocessorHost().preScannerClose(scanner)) { 3787 // bypass the actual close. 3788 return; 3789 } 3790 } 3791 RegionScannerHolder rsh = scanners.remove(scannerName); 3792 if (rsh != null) { 3793 if (context != null) { 3794 context.setCallBack(rsh.closeCallBack); 3795 } else { 3796 rsh.s.close(); 3797 } 3798 if (region.getCoprocessorHost() != null) { 3799 region.getCoprocessorHost().postScannerClose(scanner); 3800 } 3801 if (!isError) { 3802 closedScanners.put(scannerName, rsh.getNextCallSeq()); 3803 } 3804 } 3805 } 3806 3807 @Override 3808 public CoprocessorServiceResponse execRegionServerService(RpcController controller, 3809 CoprocessorServiceRequest request) throws ServiceException { 3810 rpcPreCheck("execRegionServerService"); 3811 return server.execRegionServerService(controller, request); 3812 } 3813 3814 @Override 3815 public GetSpaceQuotaSnapshotsResponse getSpaceQuotaSnapshots(RpcController controller, 3816 GetSpaceQuotaSnapshotsRequest request) throws ServiceException { 3817 try { 3818 final RegionServerSpaceQuotaManager manager = server.getRegionServerSpaceQuotaManager(); 3819 final GetSpaceQuotaSnapshotsResponse.Builder builder = 3820 GetSpaceQuotaSnapshotsResponse.newBuilder(); 3821 if (manager != null) { 3822 final Map<TableName, SpaceQuotaSnapshot> snapshots = manager.copyQuotaSnapshots(); 3823 for (Entry<TableName, SpaceQuotaSnapshot> snapshot : snapshots.entrySet()) { 3824 builder.addSnapshots(TableQuotaSnapshot.newBuilder() 3825 .setTableName(ProtobufUtil.toProtoTableName(snapshot.getKey())) 3826 .setSnapshot(SpaceQuotaSnapshot.toProtoSnapshot(snapshot.getValue())).build()); 3827 } 3828 } 3829 return builder.build(); 3830 } catch (Exception e) { 3831 throw new ServiceException(e); 3832 } 3833 } 3834 3835 @Override 3836 public ClearRegionBlockCacheResponse clearRegionBlockCache(RpcController controller, 3837 ClearRegionBlockCacheRequest request) throws ServiceException { 3838 try { 3839 rpcPreCheck("clearRegionBlockCache"); 3840 ClearRegionBlockCacheResponse.Builder builder = ClearRegionBlockCacheResponse.newBuilder(); 3841 CacheEvictionStatsBuilder stats = CacheEvictionStats.builder(); 3842 server.getRegionServerCoprocessorHost().preClearRegionBlockCache(); 3843 List<HRegion> regions = getRegions(request.getRegionList(), stats); 3844 for (HRegion region : regions) { 3845 try { 3846 stats = stats.append(this.server.clearRegionBlockCache(region)); 3847 } catch (Exception e) { 3848 stats.addException(region.getRegionInfo().getRegionName(), e); 3849 } 3850 } 3851 stats.withMaxCacheSize(server.getBlockCache().map(BlockCache::getMaxSize).orElse(0L)); 3852 server.getRegionServerCoprocessorHost().postClearRegionBlockCache(stats.build()); 3853 return builder.setStats(ProtobufUtil.toCacheEvictionStats(stats.build())).build(); 3854 } catch (IOException e) { 3855 throw new ServiceException(e); 3856 } 3857 } 3858 3859 private void executeOpenRegionProcedures(OpenRegionRequest request, 3860 Map<TableName, TableDescriptor> tdCache) { 3861 long masterSystemTime = request.hasMasterSystemTime() ? request.getMasterSystemTime() : -1; 3862 for (RegionOpenInfo regionOpenInfo : request.getOpenInfoList()) { 3863 RegionInfo regionInfo = ProtobufUtil.toRegionInfo(regionOpenInfo.getRegion()); 3864 TableName tableName = regionInfo.getTable(); 3865 TableDescriptor tableDesc = tdCache.get(tableName); 3866 if (tableDesc == null) { 3867 try { 3868 tableDesc = server.getTableDescriptors().get(regionInfo.getTable()); 3869 } catch (IOException e) { 3870 // Here we do not fail the whole method since we also need deal with other 3871 // procedures, and we can not ignore this one, so we still schedule a 3872 // AssignRegionHandler and it will report back to master if we still can not get the 3873 // TableDescriptor. 3874 LOG.warn("Failed to get TableDescriptor of {}, will try again in the handler", 3875 regionInfo.getTable(), e); 3876 } 3877 if (tableDesc != null) { 3878 tdCache.put(tableName, tableDesc); 3879 } 3880 } 3881 if (regionOpenInfo.getFavoredNodesCount() > 0) { 3882 server.updateRegionFavoredNodesMapping(regionInfo.getEncodedName(), 3883 regionOpenInfo.getFavoredNodesList()); 3884 } 3885 long procId = regionOpenInfo.getOpenProcId(); 3886 if (server.submitRegionProcedure(procId)) { 3887 server.getExecutorService().submit( 3888 AssignRegionHandler.create(server, regionInfo, procId, tableDesc, masterSystemTime)); 3889 } 3890 } 3891 } 3892 3893 private void executeCloseRegionProcedures(CloseRegionRequest request) { 3894 String encodedName; 3895 try { 3896 encodedName = ProtobufUtil.getRegionEncodedName(request.getRegion()); 3897 } catch (DoNotRetryIOException e) { 3898 throw new UncheckedIOException("Should not happen", e); 3899 } 3900 ServerName destination = request.hasDestinationServer() 3901 ? ProtobufUtil.toServerName(request.getDestinationServer()) 3902 : null; 3903 long procId = request.getCloseProcId(); 3904 boolean evictCache = request.getEvictCache(); 3905 if (server.submitRegionProcedure(procId)) { 3906 server.getExecutorService().submit( 3907 UnassignRegionHandler.create(server, encodedName, procId, false, destination, evictCache)); 3908 } 3909 } 3910 3911 private void executeProcedures(RemoteProcedureRequest request) { 3912 RSProcedureCallable callable; 3913 try { 3914 callable = Class.forName(request.getProcClass()).asSubclass(RSProcedureCallable.class) 3915 .getDeclaredConstructor().newInstance(); 3916 } catch (Exception e) { 3917 LOG.warn("Failed to instantiating remote procedure {}, pid={}", request.getProcClass(), 3918 request.getProcId(), e); 3919 server.remoteProcedureComplete(request.getProcId(), e); 3920 return; 3921 } 3922 callable.init(request.getProcData().toByteArray(), server); 3923 LOG.debug("Executing remote procedure {}, pid={}", callable.getClass(), request.getProcId()); 3924 server.executeProcedure(request.getProcId(), callable); 3925 } 3926 3927 @Override 3928 @QosPriority(priority = HConstants.ADMIN_QOS) 3929 public ExecuteProceduresResponse executeProcedures(RpcController controller, 3930 ExecuteProceduresRequest request) throws ServiceException { 3931 try { 3932 checkOpen(); 3933 throwOnWrongStartCode(request); 3934 server.getRegionServerCoprocessorHost().preExecuteProcedures(); 3935 if (request.getOpenRegionCount() > 0) { 3936 // Avoid reading from the TableDescritor every time(usually it will read from the file 3937 // system) 3938 Map<TableName, TableDescriptor> tdCache = new HashMap<>(); 3939 request.getOpenRegionList().forEach(req -> executeOpenRegionProcedures(req, tdCache)); 3940 } 3941 if (request.getCloseRegionCount() > 0) { 3942 request.getCloseRegionList().forEach(this::executeCloseRegionProcedures); 3943 } 3944 if (request.getProcCount() > 0) { 3945 request.getProcList().forEach(this::executeProcedures); 3946 } 3947 server.getRegionServerCoprocessorHost().postExecuteProcedures(); 3948 return ExecuteProceduresResponse.getDefaultInstance(); 3949 } catch (IOException e) { 3950 throw new ServiceException(e); 3951 } 3952 } 3953 3954 @Override 3955 public GetAllBootstrapNodesResponse getAllBootstrapNodes(RpcController controller, 3956 GetAllBootstrapNodesRequest request) throws ServiceException { 3957 GetAllBootstrapNodesResponse.Builder builder = GetAllBootstrapNodesResponse.newBuilder(); 3958 server.getBootstrapNodes() 3959 .forEachRemaining(server -> builder.addNode(ProtobufUtil.toServerName(server))); 3960 return builder.build(); 3961 } 3962 3963 private void setReloadableGuardrails(Configuration conf) { 3964 rowSizeWarnThreshold = 3965 conf.getInt(HConstants.BATCH_ROWS_THRESHOLD_NAME, HConstants.BATCH_ROWS_THRESHOLD_DEFAULT); 3966 rejectRowsWithSizeOverThreshold = 3967 conf.getBoolean(REJECT_BATCH_ROWS_OVER_THRESHOLD, DEFAULT_REJECT_BATCH_ROWS_OVER_THRESHOLD); 3968 maxScannerResultSize = conf.getLong(HConstants.HBASE_SERVER_SCANNER_MAX_RESULT_SIZE_KEY, 3969 HConstants.DEFAULT_HBASE_SERVER_SCANNER_MAX_RESULT_SIZE); 3970 } 3971 3972 @Override 3973 public void onConfigurationChange(Configuration conf) { 3974 super.onConfigurationChange(conf); 3975 setReloadableGuardrails(conf); 3976 } 3977 3978 @Override 3979 public GetCachedFilesListResponse getCachedFilesList(RpcController controller, 3980 GetCachedFilesListRequest request) throws ServiceException { 3981 GetCachedFilesListResponse.Builder responseBuilder = GetCachedFilesListResponse.newBuilder(); 3982 List<String> fullyCachedFiles = new ArrayList<>(); 3983 server.getBlockCache().flatMap(BlockCache::getFullyCachedFiles).ifPresent(fcf -> { 3984 fullyCachedFiles.addAll(fcf.keySet()); 3985 }); 3986 return responseBuilder.addAllCachedFiles(fullyCachedFiles).build(); 3987 } 3988}