001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019 020import com.google.errorprone.annotations.RestrictedApi; 021import java.io.FileNotFoundException; 022import java.io.IOException; 023import java.io.UncheckedIOException; 024import java.net.BindException; 025import java.net.InetAddress; 026import java.net.InetSocketAddress; 027import java.util.ArrayList; 028import java.util.Arrays; 029import java.util.Collections; 030import java.util.HashMap; 031import java.util.Iterator; 032import java.util.List; 033import java.util.Map; 034import java.util.Map.Entry; 035import java.util.NavigableMap; 036import java.util.Set; 037import java.util.TreeSet; 038import java.util.concurrent.ConcurrentHashMap; 039import java.util.concurrent.ConcurrentMap; 040import java.util.concurrent.TimeUnit; 041import java.util.concurrent.atomic.AtomicBoolean; 042import java.util.concurrent.atomic.AtomicLong; 043import java.util.concurrent.atomic.LongAdder; 044import org.apache.hadoop.conf.Configuration; 045import org.apache.hadoop.fs.FileSystem; 046import org.apache.hadoop.fs.Path; 047import org.apache.hadoop.hbase.CacheEvictionStats; 048import org.apache.hadoop.hbase.CacheEvictionStatsBuilder; 049import org.apache.hadoop.hbase.Cell; 050import org.apache.hadoop.hbase.CellScanner; 051import org.apache.hadoop.hbase.CellUtil; 052import org.apache.hadoop.hbase.DoNotRetryIOException; 053import org.apache.hadoop.hbase.DroppedSnapshotException; 054import org.apache.hadoop.hbase.ExtendedCellScannable; 055import org.apache.hadoop.hbase.ExtendedCellScanner; 056import org.apache.hadoop.hbase.HBaseIOException; 057import org.apache.hadoop.hbase.HBaseRpcServicesBase; 058import org.apache.hadoop.hbase.HConstants; 059import org.apache.hadoop.hbase.MultiActionResultTooLarge; 060import org.apache.hadoop.hbase.NotServingRegionException; 061import org.apache.hadoop.hbase.PrivateCellUtil; 062import org.apache.hadoop.hbase.RegionTooBusyException; 063import org.apache.hadoop.hbase.Server; 064import org.apache.hadoop.hbase.ServerName; 065import org.apache.hadoop.hbase.TableName; 066import org.apache.hadoop.hbase.UnknownScannerException; 067import org.apache.hadoop.hbase.client.Append; 068import org.apache.hadoop.hbase.client.CheckAndMutate; 069import org.apache.hadoop.hbase.client.CheckAndMutateResult; 070import org.apache.hadoop.hbase.client.Delete; 071import org.apache.hadoop.hbase.client.Durability; 072import org.apache.hadoop.hbase.client.Get; 073import org.apache.hadoop.hbase.client.Increment; 074import org.apache.hadoop.hbase.client.Mutation; 075import org.apache.hadoop.hbase.client.OperationWithAttributes; 076import org.apache.hadoop.hbase.client.PackagePrivateFieldAccessor; 077import org.apache.hadoop.hbase.client.Put; 078import org.apache.hadoop.hbase.client.RegionInfo; 079import org.apache.hadoop.hbase.client.RegionReplicaUtil; 080import org.apache.hadoop.hbase.client.Result; 081import org.apache.hadoop.hbase.client.Row; 082import org.apache.hadoop.hbase.client.Scan; 083import org.apache.hadoop.hbase.client.TableDescriptor; 084import org.apache.hadoop.hbase.client.VersionInfoUtil; 085import org.apache.hadoop.hbase.exceptions.FailedSanityCheckException; 086import org.apache.hadoop.hbase.exceptions.OutOfOrderScannerNextException; 087import org.apache.hadoop.hbase.exceptions.ScannerResetException; 088import org.apache.hadoop.hbase.exceptions.TimeoutIOException; 089import org.apache.hadoop.hbase.exceptions.UnknownProtocolException; 090import org.apache.hadoop.hbase.io.ByteBuffAllocator; 091import org.apache.hadoop.hbase.io.hfile.BlockCache; 092import org.apache.hadoop.hbase.ipc.HBaseRpcController; 093import org.apache.hadoop.hbase.ipc.PriorityFunction; 094import org.apache.hadoop.hbase.ipc.QosPriority; 095import org.apache.hadoop.hbase.ipc.RpcCall; 096import org.apache.hadoop.hbase.ipc.RpcCallContext; 097import org.apache.hadoop.hbase.ipc.RpcCallback; 098import org.apache.hadoop.hbase.ipc.RpcServer; 099import org.apache.hadoop.hbase.ipc.RpcServer.BlockingServiceAndInterface; 100import org.apache.hadoop.hbase.ipc.RpcServerFactory; 101import org.apache.hadoop.hbase.ipc.RpcServerInterface; 102import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException; 103import org.apache.hadoop.hbase.ipc.ServerRpcController; 104import org.apache.hadoop.hbase.net.Address; 105import org.apache.hadoop.hbase.procedure2.RSProcedureCallable; 106import org.apache.hadoop.hbase.quotas.ActivePolicyEnforcement; 107import org.apache.hadoop.hbase.quotas.OperationQuota; 108import org.apache.hadoop.hbase.quotas.QuotaUtil; 109import org.apache.hadoop.hbase.quotas.RegionServerRpcQuotaManager; 110import org.apache.hadoop.hbase.quotas.RegionServerSpaceQuotaManager; 111import org.apache.hadoop.hbase.quotas.SpaceQuotaSnapshot; 112import org.apache.hadoop.hbase.quotas.SpaceViolationPolicyEnforcement; 113import org.apache.hadoop.hbase.regionserver.LeaseManager.Lease; 114import org.apache.hadoop.hbase.regionserver.LeaseManager.LeaseStillHeldException; 115import org.apache.hadoop.hbase.regionserver.Region.Operation; 116import org.apache.hadoop.hbase.regionserver.ScannerContext.LimitScope; 117import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker; 118import org.apache.hadoop.hbase.regionserver.handler.AssignRegionHandler; 119import org.apache.hadoop.hbase.regionserver.handler.OpenMetaHandler; 120import org.apache.hadoop.hbase.regionserver.handler.OpenPriorityRegionHandler; 121import org.apache.hadoop.hbase.regionserver.handler.OpenRegionHandler; 122import org.apache.hadoop.hbase.regionserver.handler.UnassignRegionHandler; 123import org.apache.hadoop.hbase.replication.ReplicationUtils; 124import org.apache.hadoop.hbase.replication.regionserver.RejectReplicationRequestStateChecker; 125import org.apache.hadoop.hbase.replication.regionserver.RejectRequestsFromClientStateChecker; 126import org.apache.hadoop.hbase.security.Superusers; 127import org.apache.hadoop.hbase.security.access.Permission; 128import org.apache.hadoop.hbase.util.Bytes; 129import org.apache.hadoop.hbase.util.DNS; 130import org.apache.hadoop.hbase.util.DNS.ServerType; 131import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 132import org.apache.hadoop.hbase.util.Pair; 133import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil; 134import org.apache.hadoop.hbase.wal.WAL; 135import org.apache.hadoop.hbase.wal.WALEdit; 136import org.apache.hadoop.hbase.wal.WALKey; 137import org.apache.hadoop.hbase.wal.WALSplitUtil; 138import org.apache.hadoop.hbase.wal.WALSplitUtil.MutationReplay; 139import org.apache.hadoop.hbase.zookeeper.ZKWatcher; 140import org.apache.yetus.audience.InterfaceAudience; 141import org.slf4j.Logger; 142import org.slf4j.LoggerFactory; 143 144import org.apache.hbase.thirdparty.com.google.common.cache.Cache; 145import org.apache.hbase.thirdparty.com.google.common.cache.CacheBuilder; 146import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList; 147import org.apache.hbase.thirdparty.com.google.common.collect.Lists; 148import org.apache.hbase.thirdparty.com.google.protobuf.ByteString; 149import org.apache.hbase.thirdparty.com.google.protobuf.Message; 150import org.apache.hbase.thirdparty.com.google.protobuf.RpcController; 151import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException; 152import org.apache.hbase.thirdparty.com.google.protobuf.TextFormat; 153import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations; 154import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils; 155 156import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 157import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter; 158import org.apache.hadoop.hbase.shaded.protobuf.ResponseConverter; 159import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService; 160import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearCompactionQueuesRequest; 161import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearCompactionQueuesResponse; 162import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearRegionBlockCacheRequest; 163import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearRegionBlockCacheResponse; 164import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CloseRegionRequest; 165import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CloseRegionResponse; 166import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactRegionRequest; 167import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactRegionResponse; 168import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactionSwitchRequest; 169import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactionSwitchResponse; 170import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ExecuteProceduresRequest; 171import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ExecuteProceduresResponse; 172import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionRequest; 173import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionResponse; 174import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetCachedFilesListRequest; 175import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetCachedFilesListResponse; 176import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetOnlineRegionRequest; 177import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetOnlineRegionResponse; 178import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoRequest; 179import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoResponse; 180import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionLoadRequest; 181import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionLoadResponse; 182import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetServerInfoRequest; 183import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetServerInfoResponse; 184import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetStoreFileRequest; 185import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetStoreFileResponse; 186import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionRequest; 187import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionRequest.RegionOpenInfo; 188import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionResponse; 189import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionResponse.RegionOpeningState; 190import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.RemoteProcedureRequest; 191import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ReplicateWALEntryRequest; 192import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ReplicateWALEntryResponse; 193import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.RollWALWriterRequest; 194import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.RollWALWriterResponse; 195import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.StopServerRequest; 196import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.StopServerResponse; 197import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateFavoredNodesRequest; 198import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateFavoredNodesResponse; 199import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WALEntry; 200import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WarmupRegionRequest; 201import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WarmupRegionResponse; 202import org.apache.hadoop.hbase.shaded.protobuf.generated.BootstrapNodeProtos.BootstrapNodeService; 203import org.apache.hadoop.hbase.shaded.protobuf.generated.BootstrapNodeProtos.GetAllBootstrapNodesRequest; 204import org.apache.hadoop.hbase.shaded.protobuf.generated.BootstrapNodeProtos.GetAllBootstrapNodesResponse; 205import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos; 206import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.Action; 207import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.BulkLoadHFileRequest; 208import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.BulkLoadHFileRequest.FamilyPath; 209import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.BulkLoadHFileResponse; 210import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CleanupBulkLoadRequest; 211import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CleanupBulkLoadResponse; 212import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService; 213import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.Condition; 214import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceRequest; 215import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceResponse; 216import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.GetRequest; 217import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.GetResponse; 218import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MultiRegionLoadStats; 219import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MultiRequest; 220import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MultiResponse; 221import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateRequest; 222import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateResponse; 223import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutationProto; 224import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutationProto.MutationType; 225import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.PrepareBulkLoadRequest; 226import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.PrepareBulkLoadResponse; 227import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.RegionAction; 228import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.RegionActionResult; 229import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ResultOrException; 230import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanRequest; 231import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanResponse; 232import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos; 233import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos.RegionLoad; 234import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.NameBytesPair; 235import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.NameInt64Pair; 236import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier; 237import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType; 238import org.apache.hadoop.hbase.shaded.protobuf.generated.MapReduceProtos.ScanMetrics; 239import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsRequest; 240import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsResponse; 241import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsResponse.TableQuotaSnapshot; 242import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.ClientMetaService; 243import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.BulkLoadDescriptor; 244import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.CompactionDescriptor; 245import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.FlushDescriptor; 246import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.RegionEventDescriptor; 247 248/** 249 * Implements the regionserver RPC services. 250 */ 251@InterfaceAudience.Private 252public class RSRpcServices extends HBaseRpcServicesBase<HRegionServer> 253 implements ClientService.BlockingInterface, BootstrapNodeService.BlockingInterface { 254 255 private static final Logger LOG = LoggerFactory.getLogger(RSRpcServices.class); 256 257 /** RPC scheduler to use for the region server. */ 258 public static final String REGION_SERVER_RPC_SCHEDULER_FACTORY_CLASS = 259 "hbase.region.server.rpc.scheduler.factory.class"; 260 261 /** 262 * Minimum allowable time limit delta (in milliseconds) that can be enforced during scans. This 263 * configuration exists to prevent the scenario where a time limit is specified to be so 264 * restrictive that the time limit is reached immediately (before any cells are scanned). 265 */ 266 private static final String REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA = 267 "hbase.region.server.rpc.minimum.scan.time.limit.delta"; 268 /** 269 * Default value of {@link RSRpcServices#REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA} 270 */ 271 static final long DEFAULT_REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA = 10; 272 273 /** 274 * Whether to reject rows with size > threshold defined by 275 * {@link HConstants#BATCH_ROWS_THRESHOLD_NAME} 276 */ 277 private static final String REJECT_BATCH_ROWS_OVER_THRESHOLD = 278 "hbase.rpc.rows.size.threshold.reject"; 279 280 /** 281 * Default value of config {@link RSRpcServices#REJECT_BATCH_ROWS_OVER_THRESHOLD} 282 */ 283 private static final boolean DEFAULT_REJECT_BATCH_ROWS_OVER_THRESHOLD = false; 284 285 // Request counter. (Includes requests that are not serviced by regions.) 286 // Count only once for requests with multiple actions like multi/caching-scan/replayBatch 287 final LongAdder requestCount = new LongAdder(); 288 289 // Request counter for rpc get 290 final LongAdder rpcGetRequestCount = new LongAdder(); 291 292 // Request counter for rpc scan 293 final LongAdder rpcScanRequestCount = new LongAdder(); 294 295 // Request counter for scans that might end up in full scans 296 final LongAdder rpcFullScanRequestCount = new LongAdder(); 297 298 // Request counter for rpc multi 299 final LongAdder rpcMultiRequestCount = new LongAdder(); 300 301 // Request counter for rpc mutate 302 final LongAdder rpcMutateRequestCount = new LongAdder(); 303 304 private volatile long maxScannerResultSize; 305 306 private ScannerIdGenerator scannerIdGenerator; 307 private final ConcurrentMap<String, RegionScannerHolder> scanners = new ConcurrentHashMap<>(); 308 // Hold the name and last sequence number of a closed scanner for a while. This is used 309 // to keep compatible for old clients which may send next or close request to a region 310 // scanner which has already been exhausted. The entries will be removed automatically 311 // after scannerLeaseTimeoutPeriod. 312 private final Cache<String, Long> closedScanners; 313 /** 314 * The lease timeout period for client scanners (milliseconds). 315 */ 316 private final int scannerLeaseTimeoutPeriod; 317 318 /** 319 * The RPC timeout period (milliseconds) 320 */ 321 private final int rpcTimeout; 322 323 /** 324 * The minimum allowable delta to use for the scan limit 325 */ 326 private final long minimumScanTimeLimitDelta; 327 328 /** 329 * Row size threshold for multi requests above which a warning is logged 330 */ 331 private volatile int rowSizeWarnThreshold; 332 /* 333 * Whether we should reject requests with very high no of rows i.e. beyond threshold defined by 334 * rowSizeWarnThreshold 335 */ 336 private volatile boolean rejectRowsWithSizeOverThreshold; 337 338 final AtomicBoolean clearCompactionQueues = new AtomicBoolean(false); 339 340 /** 341 * Services launched in RSRpcServices. By default they are on but you can use the below booleans 342 * to selectively enable/disable these services (Rare is the case where you would ever turn off 343 * one or the other). 344 */ 345 public static final String REGIONSERVER_ADMIN_SERVICE_CONFIG = 346 "hbase.regionserver.admin.executorService"; 347 public static final String REGIONSERVER_CLIENT_SERVICE_CONFIG = 348 "hbase.regionserver.client.executorService"; 349 public static final String REGIONSERVER_CLIENT_META_SERVICE_CONFIG = 350 "hbase.regionserver.client.meta.executorService"; 351 public static final String REGIONSERVER_BOOTSTRAP_NODES_SERVICE_CONFIG = 352 "hbase.regionserver.bootstrap.nodes.executorService"; 353 354 /** 355 * An Rpc callback for closing a RegionScanner. 356 */ 357 private static final class RegionScannerCloseCallBack implements RpcCallback { 358 359 private final RegionScanner scanner; 360 361 public RegionScannerCloseCallBack(RegionScanner scanner) { 362 this.scanner = scanner; 363 } 364 365 @Override 366 public void run() throws IOException { 367 this.scanner.close(); 368 } 369 } 370 371 /** 372 * An Rpc callback for doing shipped() call on a RegionScanner. 373 */ 374 private class RegionScannerShippedCallBack implements RpcCallback { 375 private final String scannerName; 376 private final Shipper shipper; 377 private final Lease lease; 378 379 public RegionScannerShippedCallBack(String scannerName, Shipper shipper, Lease lease) { 380 this.scannerName = scannerName; 381 this.shipper = shipper; 382 this.lease = lease; 383 } 384 385 @Override 386 public void run() throws IOException { 387 this.shipper.shipped(); 388 // We're done. On way out re-add the above removed lease. The lease was temp removed for this 389 // Rpc call and we are at end of the call now. Time to add it back. 390 if (scanners.containsKey(scannerName)) { 391 if (lease != null) { 392 server.getLeaseManager().addLease(lease); 393 } 394 } 395 } 396 } 397 398 /** 399 * An RpcCallBack that creates a list of scanners that needs to perform callBack operation on 400 * completion of multiGets. 401 */ 402 static class RegionScannersCloseCallBack implements RpcCallback { 403 private final List<RegionScanner> scanners = new ArrayList<>(); 404 405 public void addScanner(RegionScanner scanner) { 406 this.scanners.add(scanner); 407 } 408 409 @Override 410 public void run() { 411 for (RegionScanner scanner : scanners) { 412 try { 413 scanner.close(); 414 } catch (IOException e) { 415 LOG.error("Exception while closing the scanner " + scanner, e); 416 } 417 } 418 } 419 } 420 421 /** 422 * Holder class which holds the RegionScanner, nextCallSeq and RpcCallbacks together. 423 */ 424 static final class RegionScannerHolder { 425 private final AtomicLong nextCallSeq = new AtomicLong(0); 426 private final RegionScanner s; 427 private final HRegion r; 428 private final RpcCallback closeCallBack; 429 private final RpcCallback shippedCallback; 430 private byte[] rowOfLastPartialResult; 431 private boolean needCursor; 432 private boolean fullRegionScan; 433 private final String clientIPAndPort; 434 private final String userName; 435 private volatile long maxBlockBytesScanned = 0; 436 private volatile long prevBlockBytesScanned = 0; 437 private volatile long prevBlockBytesScannedDifference = 0; 438 439 RegionScannerHolder(RegionScanner s, HRegion r, RpcCallback closeCallBack, 440 RpcCallback shippedCallback, boolean needCursor, boolean fullRegionScan, 441 String clientIPAndPort, String userName) { 442 this.s = s; 443 this.r = r; 444 this.closeCallBack = closeCallBack; 445 this.shippedCallback = shippedCallback; 446 this.needCursor = needCursor; 447 this.fullRegionScan = fullRegionScan; 448 this.clientIPAndPort = clientIPAndPort; 449 this.userName = userName; 450 } 451 452 long getNextCallSeq() { 453 return nextCallSeq.get(); 454 } 455 456 boolean incNextCallSeq(long currentSeq) { 457 // Use CAS to prevent multiple scan request running on the same scanner. 458 return nextCallSeq.compareAndSet(currentSeq, currentSeq + 1); 459 } 460 461 long getMaxBlockBytesScanned() { 462 return maxBlockBytesScanned; 463 } 464 465 long getPrevBlockBytesScannedDifference() { 466 return prevBlockBytesScannedDifference; 467 } 468 469 void updateBlockBytesScanned(long blockBytesScanned) { 470 prevBlockBytesScannedDifference = blockBytesScanned - prevBlockBytesScanned; 471 prevBlockBytesScanned = blockBytesScanned; 472 if (blockBytesScanned > maxBlockBytesScanned) { 473 maxBlockBytesScanned = blockBytesScanned; 474 } 475 } 476 477 // Should be called only when we need to print lease expired messages otherwise 478 // cache the String once made. 479 @Override 480 public String toString() { 481 return "clientIPAndPort=" + this.clientIPAndPort + ", userName=" + this.userName 482 + ", regionInfo=" + this.r.getRegionInfo().getRegionNameAsString(); 483 } 484 } 485 486 /** 487 * Instantiated as a scanner lease. If the lease times out, the scanner is closed 488 */ 489 private class ScannerListener implements LeaseListener { 490 private final String scannerName; 491 492 ScannerListener(final String n) { 493 this.scannerName = n; 494 } 495 496 @Override 497 public void leaseExpired() { 498 RegionScannerHolder rsh = scanners.remove(this.scannerName); 499 if (rsh == null) { 500 LOG.warn("Scanner lease {} expired but no outstanding scanner", this.scannerName); 501 return; 502 } 503 LOG.info("Scanner lease {} expired {}", this.scannerName, rsh); 504 server.getMetrics().incrScannerLeaseExpired(); 505 RegionScanner s = rsh.s; 506 HRegion region = null; 507 try { 508 region = server.getRegion(s.getRegionInfo().getRegionName()); 509 if (region != null && region.getCoprocessorHost() != null) { 510 region.getCoprocessorHost().preScannerClose(s); 511 } 512 } catch (IOException e) { 513 LOG.error("Closing scanner {} {}", this.scannerName, rsh, e); 514 } finally { 515 try { 516 s.close(); 517 if (region != null && region.getCoprocessorHost() != null) { 518 region.getCoprocessorHost().postScannerClose(s); 519 } 520 } catch (IOException e) { 521 LOG.error("Closing scanner {} {}", this.scannerName, rsh, e); 522 } 523 } 524 } 525 } 526 527 private static ResultOrException getResultOrException(final ClientProtos.Result r, 528 final int index) { 529 return getResultOrException(ResponseConverter.buildActionResult(r), index); 530 } 531 532 private static ResultOrException getResultOrException(final Exception e, final int index) { 533 return getResultOrException(ResponseConverter.buildActionResult(e), index); 534 } 535 536 private static ResultOrException getResultOrException(final ResultOrException.Builder builder, 537 final int index) { 538 return builder.setIndex(index).build(); 539 } 540 541 /** 542 * Checks for the following pre-checks in order: 543 * <ol> 544 * <li>RegionServer is running</li> 545 * <li>If authorization is enabled, then RPC caller has ADMIN permissions</li> 546 * </ol> 547 * @param requestName name of rpc request. Used in reporting failures to provide context. 548 * @throws ServiceException If any of the above listed pre-check fails. 549 */ 550 private void rpcPreCheck(String requestName) throws ServiceException { 551 try { 552 checkOpen(); 553 requirePermission(requestName, Permission.Action.ADMIN); 554 } catch (IOException ioe) { 555 throw new ServiceException(ioe); 556 } 557 } 558 559 private boolean isClientCellBlockSupport(RpcCallContext context) { 560 return context != null && context.isClientCellBlockSupported(); 561 } 562 563 private void addResult(final MutateResponse.Builder builder, final Result result, 564 final HBaseRpcController rpcc, boolean clientCellBlockSupported) { 565 if (result == null) return; 566 if (clientCellBlockSupported) { 567 builder.setResult(ProtobufUtil.toResultNoData(result)); 568 rpcc.setCellScanner(result.cellScanner()); 569 } else { 570 ClientProtos.Result pbr = ProtobufUtil.toResult(result); 571 builder.setResult(pbr); 572 } 573 } 574 575 private void addResults(ScanResponse.Builder builder, List<Result> results, 576 HBaseRpcController controller, boolean isDefaultRegion, boolean clientCellBlockSupported) { 577 builder.setStale(!isDefaultRegion); 578 if (results.isEmpty()) { 579 return; 580 } 581 if (clientCellBlockSupported) { 582 for (Result res : results) { 583 builder.addCellsPerResult(res.size()); 584 builder.addPartialFlagPerResult(res.mayHaveMoreCellsInRow()); 585 } 586 controller.setCellScanner(PrivateCellUtil.createExtendedCellScanner(results)); 587 } else { 588 for (Result res : results) { 589 ClientProtos.Result pbr = ProtobufUtil.toResult(res); 590 builder.addResults(pbr); 591 } 592 } 593 } 594 595 private CheckAndMutateResult checkAndMutate(HRegion region, List<ClientProtos.Action> actions, 596 CellScanner cellScanner, Condition condition, long nonceGroup, 597 ActivePolicyEnforcement spaceQuotaEnforcement) throws IOException { 598 int countOfCompleteMutation = 0; 599 try { 600 if (!region.getRegionInfo().isMetaRegion()) { 601 server.getMemStoreFlusher().reclaimMemStoreMemory(); 602 } 603 List<Mutation> mutations = new ArrayList<>(); 604 long nonce = HConstants.NO_NONCE; 605 for (ClientProtos.Action action : actions) { 606 if (action.hasGet()) { 607 throw new DoNotRetryIOException( 608 "Atomic put and/or delete only, not a Get=" + action.getGet()); 609 } 610 MutationProto mutation = action.getMutation(); 611 MutationType type = mutation.getMutateType(); 612 switch (type) { 613 case PUT: 614 Put put = ProtobufUtil.toPut(mutation, cellScanner); 615 ++countOfCompleteMutation; 616 checkCellSizeLimit(region, put); 617 spaceQuotaEnforcement.getPolicyEnforcement(region).check(put); 618 mutations.add(put); 619 break; 620 case DELETE: 621 Delete del = ProtobufUtil.toDelete(mutation, cellScanner); 622 ++countOfCompleteMutation; 623 spaceQuotaEnforcement.getPolicyEnforcement(region).check(del); 624 mutations.add(del); 625 break; 626 case INCREMENT: 627 Increment increment = ProtobufUtil.toIncrement(mutation, cellScanner); 628 nonce = mutation.hasNonce() ? mutation.getNonce() : HConstants.NO_NONCE; 629 ++countOfCompleteMutation; 630 checkCellSizeLimit(region, increment); 631 spaceQuotaEnforcement.getPolicyEnforcement(region).check(increment); 632 mutations.add(increment); 633 break; 634 case APPEND: 635 Append append = ProtobufUtil.toAppend(mutation, cellScanner); 636 nonce = mutation.hasNonce() ? mutation.getNonce() : HConstants.NO_NONCE; 637 ++countOfCompleteMutation; 638 checkCellSizeLimit(region, append); 639 spaceQuotaEnforcement.getPolicyEnforcement(region).check(append); 640 mutations.add(append); 641 break; 642 default: 643 throw new DoNotRetryIOException("invalid mutation type : " + type); 644 } 645 } 646 647 if (mutations.size() == 0) { 648 return new CheckAndMutateResult(true, null); 649 } else { 650 CheckAndMutate checkAndMutate = ProtobufUtil.toCheckAndMutate(condition, mutations); 651 CheckAndMutateResult result = null; 652 if (region.getCoprocessorHost() != null) { 653 result = region.getCoprocessorHost().preCheckAndMutate(checkAndMutate); 654 } 655 if (result == null) { 656 result = region.checkAndMutate(checkAndMutate, nonceGroup, nonce); 657 if (region.getCoprocessorHost() != null) { 658 result = region.getCoprocessorHost().postCheckAndMutate(checkAndMutate, result); 659 } 660 } 661 return result; 662 } 663 } finally { 664 // Currently, the checkAndMutate isn't supported by batch so it won't mess up the cell scanner 665 // even if the malformed cells are not skipped. 666 for (int i = countOfCompleteMutation; i < actions.size(); ++i) { 667 skipCellsForMutation(actions.get(i), cellScanner); 668 } 669 } 670 } 671 672 /** 673 * Execute an append mutation. 674 * @return result to return to client if default operation should be bypassed as indicated by 675 * RegionObserver, null otherwise 676 */ 677 private Result append(final HRegion region, final OperationQuota quota, 678 final MutationProto mutation, final CellScanner cellScanner, long nonceGroup, 679 ActivePolicyEnforcement spaceQuota, RpcCallContext context) throws IOException { 680 long before = EnvironmentEdgeManager.currentTime(); 681 Append append = ProtobufUtil.toAppend(mutation, cellScanner); 682 checkCellSizeLimit(region, append); 683 spaceQuota.getPolicyEnforcement(region).check(append); 684 quota.addMutation(append); 685 long blockBytesScannedBefore = context != null ? context.getBlockBytesScanned() : 0; 686 long nonce = mutation.hasNonce() ? mutation.getNonce() : HConstants.NO_NONCE; 687 Result r = region.append(append, nonceGroup, nonce); 688 if (server.getMetrics() != null) { 689 long blockBytesScanned = 690 context != null ? context.getBlockBytesScanned() - blockBytesScannedBefore : 0; 691 server.getMetrics().updateAppend(region, EnvironmentEdgeManager.currentTime() - before, 692 blockBytesScanned); 693 } 694 return r == null ? Result.EMPTY_RESULT : r; 695 } 696 697 /** 698 * Execute an increment mutation. 699 */ 700 private Result increment(final HRegion region, final OperationQuota quota, 701 final MutationProto mutation, final CellScanner cells, long nonceGroup, 702 ActivePolicyEnforcement spaceQuota, RpcCallContext context) throws IOException { 703 long before = EnvironmentEdgeManager.currentTime(); 704 Increment increment = ProtobufUtil.toIncrement(mutation, cells); 705 checkCellSizeLimit(region, increment); 706 spaceQuota.getPolicyEnforcement(region).check(increment); 707 quota.addMutation(increment); 708 long blockBytesScannedBefore = context != null ? context.getBlockBytesScanned() : 0; 709 long nonce = mutation.hasNonce() ? mutation.getNonce() : HConstants.NO_NONCE; 710 Result r = region.increment(increment, nonceGroup, nonce); 711 final MetricsRegionServer metricsRegionServer = server.getMetrics(); 712 if (metricsRegionServer != null) { 713 long blockBytesScanned = 714 context != null ? context.getBlockBytesScanned() - blockBytesScannedBefore : 0; 715 metricsRegionServer.updateIncrement(region, EnvironmentEdgeManager.currentTime() - before, 716 blockBytesScanned); 717 } 718 return r == null ? Result.EMPTY_RESULT : r; 719 } 720 721 /** 722 * Run through the regionMutation <code>rm</code> and per Mutation, do the work, and then when 723 * done, add an instance of a {@link ResultOrException} that corresponds to each Mutation. 724 * @param cellsToReturn Could be null. May be allocated in this method. This is what this method 725 * returns as a 'result'. 726 * @param closeCallBack the callback to be used with multigets 727 * @param context the current RpcCallContext 728 * @return Return the <code>cellScanner</code> passed 729 */ 730 private List<ExtendedCellScannable> doNonAtomicRegionMutation(final HRegion region, 731 final OperationQuota quota, final RegionAction actions, final CellScanner cellScanner, 732 final RegionActionResult.Builder builder, List<ExtendedCellScannable> cellsToReturn, 733 long nonceGroup, final RegionScannersCloseCallBack closeCallBack, RpcCallContext context, 734 ActivePolicyEnforcement spaceQuotaEnforcement) { 735 // Gather up CONTIGUOUS Puts and Deletes in this mutations List. Idea is that rather than do 736 // one at a time, we instead pass them in batch. Be aware that the corresponding 737 // ResultOrException instance that matches each Put or Delete is then added down in the 738 // doNonAtomicBatchOp call. We should be staying aligned though the Put and Delete are 739 // deferred/batched 740 List<ClientProtos.Action> mutations = null; 741 long maxQuotaResultSize = Math.min(maxScannerResultSize, quota.getMaxResultSize()); 742 IOException sizeIOE = null; 743 ClientProtos.ResultOrException.Builder resultOrExceptionBuilder = 744 ResultOrException.newBuilder(); 745 boolean hasResultOrException = false; 746 for (ClientProtos.Action action : actions.getActionList()) { 747 hasResultOrException = false; 748 resultOrExceptionBuilder.clear(); 749 try { 750 Result r = null; 751 long blockBytesScannedBefore = context != null ? context.getBlockBytesScanned() : 0; 752 if ( 753 context != null && context.isRetryImmediatelySupported() 754 && (context.getResponseCellSize() > maxQuotaResultSize 755 || blockBytesScannedBefore + context.getResponseExceptionSize() > maxQuotaResultSize) 756 ) { 757 758 // We're storing the exception since the exception and reason string won't 759 // change after the response size limit is reached. 760 if (sizeIOE == null) { 761 // We don't need the stack un-winding do don't throw the exception. 762 // Throwing will kill the JVM's JIT. 763 // 764 // Instead just create the exception and then store it. 765 sizeIOE = new MultiActionResultTooLarge("Max size exceeded" + " CellSize: " 766 + context.getResponseCellSize() + " BlockSize: " + blockBytesScannedBefore); 767 768 // Only report the exception once since there's only one request that 769 // caused the exception. Otherwise this number will dominate the exceptions count. 770 rpcServer.getMetrics().exception(sizeIOE); 771 } 772 773 // Now that there's an exception is known to be created 774 // use it for the response. 775 // 776 // This will create a copy in the builder. 777 NameBytesPair pair = ResponseConverter.buildException(sizeIOE); 778 resultOrExceptionBuilder.setException(pair); 779 context.incrementResponseExceptionSize(pair.getSerializedSize()); 780 resultOrExceptionBuilder.setIndex(action.getIndex()); 781 builder.addResultOrException(resultOrExceptionBuilder.build()); 782 skipCellsForMutation(action, cellScanner); 783 continue; 784 } 785 if (action.hasGet()) { 786 long before = EnvironmentEdgeManager.currentTime(); 787 ClientProtos.Get pbGet = action.getGet(); 788 // An asynchbase client, https://github.com/OpenTSDB/asynchbase, starts by trying to do 789 // a get closest before. Throwing the UnknownProtocolException signals it that it needs 790 // to switch and do hbase2 protocol (HBase servers do not tell clients what versions 791 // they are; its a problem for non-native clients like asynchbase. HBASE-20225. 792 if (pbGet.hasClosestRowBefore() && pbGet.getClosestRowBefore()) { 793 throw new UnknownProtocolException("Is this a pre-hbase-1.0.0 or asynchbase client? " 794 + "Client is invoking getClosestRowBefore removed in hbase-2.0.0 replaced by " 795 + "reverse Scan."); 796 } 797 try { 798 Get get = ProtobufUtil.toGet(pbGet); 799 if (context != null) { 800 r = get(get, (region), closeCallBack, context); 801 } else { 802 r = region.get(get); 803 } 804 } finally { 805 final MetricsRegionServer metricsRegionServer = server.getMetrics(); 806 if (metricsRegionServer != null) { 807 long blockBytesScanned = 808 context != null ? context.getBlockBytesScanned() - blockBytesScannedBefore : 0; 809 metricsRegionServer.updateGet(region, EnvironmentEdgeManager.currentTime() - before, 810 blockBytesScanned); 811 } 812 } 813 } else if (action.hasServiceCall()) { 814 hasResultOrException = true; 815 Message result = execServiceOnRegion(region, action.getServiceCall()); 816 ClientProtos.CoprocessorServiceResult.Builder serviceResultBuilder = 817 ClientProtos.CoprocessorServiceResult.newBuilder(); 818 resultOrExceptionBuilder.setServiceResult(serviceResultBuilder 819 .setValue(serviceResultBuilder.getValueBuilder().setName(result.getClass().getName()) 820 // TODO: Copy!!! 821 .setValue(UnsafeByteOperations.unsafeWrap(result.toByteArray())))); 822 } else if (action.hasMutation()) { 823 MutationType type = action.getMutation().getMutateType(); 824 if ( 825 type != MutationType.PUT && type != MutationType.DELETE && mutations != null 826 && !mutations.isEmpty() 827 ) { 828 // Flush out any Puts or Deletes already collected. 829 doNonAtomicBatchOp(builder, region, quota, mutations, cellScanner, 830 spaceQuotaEnforcement); 831 mutations.clear(); 832 } 833 switch (type) { 834 case APPEND: 835 r = append(region, quota, action.getMutation(), cellScanner, nonceGroup, 836 spaceQuotaEnforcement, context); 837 break; 838 case INCREMENT: 839 r = increment(region, quota, action.getMutation(), cellScanner, nonceGroup, 840 spaceQuotaEnforcement, context); 841 break; 842 case PUT: 843 case DELETE: 844 // Collect the individual mutations and apply in a batch 845 if (mutations == null) { 846 mutations = new ArrayList<>(actions.getActionCount()); 847 } 848 mutations.add(action); 849 break; 850 default: 851 throw new DoNotRetryIOException("Unsupported mutate type: " + type.name()); 852 } 853 } else { 854 throw new HBaseIOException("Unexpected Action type"); 855 } 856 if (r != null) { 857 ClientProtos.Result pbResult = null; 858 if (isClientCellBlockSupport(context)) { 859 pbResult = ProtobufUtil.toResultNoData(r); 860 // Hard to guess the size here. Just make a rough guess. 861 if (cellsToReturn == null) { 862 cellsToReturn = new ArrayList<>(); 863 } 864 cellsToReturn.add(r); 865 } else { 866 pbResult = ProtobufUtil.toResult(r); 867 } 868 addSize(context, r); 869 hasResultOrException = true; 870 resultOrExceptionBuilder.setResult(pbResult); 871 } 872 // Could get to here and there was no result and no exception. Presumes we added 873 // a Put or Delete to the collecting Mutations List for adding later. In this 874 // case the corresponding ResultOrException instance for the Put or Delete will be added 875 // down in the doNonAtomicBatchOp method call rather than up here. 876 } catch (IOException ie) { 877 rpcServer.getMetrics().exception(ie); 878 hasResultOrException = true; 879 NameBytesPair pair = ResponseConverter.buildException(ie); 880 resultOrExceptionBuilder.setException(pair); 881 context.incrementResponseExceptionSize(pair.getSerializedSize()); 882 } 883 if (hasResultOrException) { 884 // Propagate index. 885 resultOrExceptionBuilder.setIndex(action.getIndex()); 886 builder.addResultOrException(resultOrExceptionBuilder.build()); 887 } 888 } 889 // Finish up any outstanding mutations 890 if (!CollectionUtils.isEmpty(mutations)) { 891 doNonAtomicBatchOp(builder, region, quota, mutations, cellScanner, spaceQuotaEnforcement); 892 } 893 return cellsToReturn; 894 } 895 896 private void checkCellSizeLimit(final HRegion r, final Mutation m) throws IOException { 897 if (r.maxCellSize > 0) { 898 CellScanner cells = m.cellScanner(); 899 while (cells.advance()) { 900 int size = PrivateCellUtil.estimatedSerializedSizeOf(cells.current()); 901 if (size > r.maxCellSize) { 902 String msg = "Cell[" + cells.current() + "] with size " + size + " exceeds limit of " 903 + r.maxCellSize + " bytes"; 904 LOG.debug(msg); 905 throw new DoNotRetryIOException(msg); 906 } 907 } 908 } 909 } 910 911 private void doAtomicBatchOp(final RegionActionResult.Builder builder, final HRegion region, 912 final OperationQuota quota, final List<ClientProtos.Action> mutations, final CellScanner cells, 913 long nonceGroup, ActivePolicyEnforcement spaceQuotaEnforcement) throws IOException { 914 // Just throw the exception. The exception will be caught and then added to region-level 915 // exception for RegionAction. Leaving the null to action result is ok since the null 916 // result is viewed as failure by hbase client. And the region-lever exception will be used 917 // to replaced the null result. see AsyncRequestFutureImpl#receiveMultiAction and 918 // AsyncBatchRpcRetryingCaller#onComplete for more details. 919 doBatchOp(builder, region, quota, mutations, cells, nonceGroup, spaceQuotaEnforcement, true); 920 } 921 922 private void doNonAtomicBatchOp(final RegionActionResult.Builder builder, final HRegion region, 923 final OperationQuota quota, final List<ClientProtos.Action> mutations, final CellScanner cells, 924 ActivePolicyEnforcement spaceQuotaEnforcement) { 925 try { 926 doBatchOp(builder, region, quota, mutations, cells, HConstants.NO_NONCE, 927 spaceQuotaEnforcement, false); 928 } catch (IOException e) { 929 // Set the exception for each action. The mutations in same RegionAction are group to 930 // different batch and then be processed individually. Hence, we don't set the region-level 931 // exception here for whole RegionAction. 932 for (Action mutation : mutations) { 933 builder.addResultOrException(getResultOrException(e, mutation.getIndex())); 934 } 935 } 936 } 937 938 /** 939 * Execute a list of mutations. 940 */ 941 private void doBatchOp(final RegionActionResult.Builder builder, final HRegion region, 942 final OperationQuota quota, final List<ClientProtos.Action> mutations, final CellScanner cells, 943 long nonceGroup, ActivePolicyEnforcement spaceQuotaEnforcement, boolean atomic) 944 throws IOException { 945 Mutation[] mArray = new Mutation[mutations.size()]; 946 long before = EnvironmentEdgeManager.currentTime(); 947 boolean batchContainsPuts = false, batchContainsDelete = false; 948 try { 949 /** 950 * HBASE-17924 mutationActionMap is a map to map the relation between mutations and actions 951 * since mutation array may have been reoredered.In order to return the right result or 952 * exception to the corresponding actions, We need to know which action is the mutation belong 953 * to. We can't sort ClientProtos.Action array, since they are bonded to cellscanners. 954 */ 955 Map<Mutation, ClientProtos.Action> mutationActionMap = new HashMap<>(); 956 int i = 0; 957 long nonce = HConstants.NO_NONCE; 958 for (ClientProtos.Action action : mutations) { 959 if (action.hasGet()) { 960 throw new DoNotRetryIOException( 961 "Atomic put and/or delete only, not a Get=" + action.getGet()); 962 } 963 MutationProto m = action.getMutation(); 964 Mutation mutation; 965 switch (m.getMutateType()) { 966 case PUT: 967 mutation = ProtobufUtil.toPut(m, cells); 968 batchContainsPuts = true; 969 break; 970 971 case DELETE: 972 mutation = ProtobufUtil.toDelete(m, cells); 973 batchContainsDelete = true; 974 break; 975 976 case INCREMENT: 977 mutation = ProtobufUtil.toIncrement(m, cells); 978 nonce = m.hasNonce() ? m.getNonce() : HConstants.NO_NONCE; 979 break; 980 981 case APPEND: 982 mutation = ProtobufUtil.toAppend(m, cells); 983 nonce = m.hasNonce() ? m.getNonce() : HConstants.NO_NONCE; 984 break; 985 986 default: 987 throw new DoNotRetryIOException("Invalid mutation type : " + m.getMutateType()); 988 } 989 mutationActionMap.put(mutation, action); 990 mArray[i++] = mutation; 991 checkCellSizeLimit(region, mutation); 992 // Check if a space quota disallows this mutation 993 spaceQuotaEnforcement.getPolicyEnforcement(region).check(mutation); 994 quota.addMutation(mutation); 995 } 996 997 if (!region.getRegionInfo().isMetaRegion()) { 998 server.getMemStoreFlusher().reclaimMemStoreMemory(); 999 } 1000 1001 // HBASE-17924 1002 // Sort to improve lock efficiency for non-atomic batch of operations. If atomic 1003 // order is preserved as its expected from the client 1004 if (!atomic) { 1005 Arrays.sort(mArray, (v1, v2) -> Row.COMPARATOR.compare(v1, v2)); 1006 } 1007 1008 OperationStatus[] codes = region.batchMutate(mArray, atomic, nonceGroup, nonce); 1009 1010 // When atomic is true, it indicates that the mutateRow API or the batch API with 1011 // RowMutations is called. In this case, we need to merge the results of the 1012 // Increment/Append operations if the mutations include those operations, and set the merged 1013 // result to the first element of the ResultOrException list 1014 if (atomic) { 1015 List<ResultOrException> resultOrExceptions = new ArrayList<>(); 1016 List<Result> results = new ArrayList<>(); 1017 for (i = 0; i < codes.length; i++) { 1018 if (codes[i].getResult() != null) { 1019 results.add(codes[i].getResult()); 1020 } 1021 if (i != 0) { 1022 resultOrExceptions 1023 .add(getResultOrException(ClientProtos.Result.getDefaultInstance(), i)); 1024 } 1025 } 1026 1027 if (results.isEmpty()) { 1028 builder.addResultOrException( 1029 getResultOrException(ClientProtos.Result.getDefaultInstance(), 0)); 1030 } else { 1031 // Merge the results of the Increment/Append operations 1032 List<Cell> cellList = new ArrayList<>(); 1033 for (Result result : results) { 1034 if (result.rawCells() != null) { 1035 cellList.addAll(Arrays.asList(result.rawCells())); 1036 } 1037 } 1038 Result result = Result.create(cellList); 1039 1040 // Set the merged result of the Increment/Append operations to the first element of the 1041 // ResultOrException list 1042 builder.addResultOrException(getResultOrException(ProtobufUtil.toResult(result), 0)); 1043 } 1044 1045 builder.addAllResultOrException(resultOrExceptions); 1046 return; 1047 } 1048 1049 for (i = 0; i < codes.length; i++) { 1050 Mutation currentMutation = mArray[i]; 1051 ClientProtos.Action currentAction = mutationActionMap.get(currentMutation); 1052 int index = currentAction.hasIndex() ? currentAction.getIndex() : i; 1053 Exception e; 1054 switch (codes[i].getOperationStatusCode()) { 1055 case BAD_FAMILY: 1056 e = new NoSuchColumnFamilyException(codes[i].getExceptionMsg()); 1057 builder.addResultOrException(getResultOrException(e, index)); 1058 break; 1059 1060 case SANITY_CHECK_FAILURE: 1061 e = new FailedSanityCheckException(codes[i].getExceptionMsg()); 1062 builder.addResultOrException(getResultOrException(e, index)); 1063 break; 1064 1065 default: 1066 e = new DoNotRetryIOException(codes[i].getExceptionMsg()); 1067 builder.addResultOrException(getResultOrException(e, index)); 1068 break; 1069 1070 case SUCCESS: 1071 ClientProtos.Result result = codes[i].getResult() == null 1072 ? ClientProtos.Result.getDefaultInstance() 1073 : ProtobufUtil.toResult(codes[i].getResult()); 1074 builder.addResultOrException(getResultOrException(result, index)); 1075 break; 1076 1077 case STORE_TOO_BUSY: 1078 e = new RegionTooBusyException(codes[i].getExceptionMsg()); 1079 builder.addResultOrException(getResultOrException(e, index)); 1080 break; 1081 } 1082 } 1083 } finally { 1084 int processedMutationIndex = 0; 1085 for (Action mutation : mutations) { 1086 // The non-null mArray[i] means the cell scanner has been read. 1087 if (mArray[processedMutationIndex++] == null) { 1088 skipCellsForMutation(mutation, cells); 1089 } 1090 } 1091 updateMutationMetrics(region, before, batchContainsPuts, batchContainsDelete); 1092 } 1093 } 1094 1095 private void updateMutationMetrics(HRegion region, long starttime, boolean batchContainsPuts, 1096 boolean batchContainsDelete) { 1097 final MetricsRegionServer metricsRegionServer = server.getMetrics(); 1098 if (metricsRegionServer != null) { 1099 long after = EnvironmentEdgeManager.currentTime(); 1100 if (batchContainsPuts) { 1101 metricsRegionServer.updatePutBatch(region, after - starttime); 1102 } 1103 if (batchContainsDelete) { 1104 metricsRegionServer.updateDeleteBatch(region, after - starttime); 1105 } 1106 } 1107 } 1108 1109 /** 1110 * Execute a list of Put/Delete mutations. The function returns OperationStatus instead of 1111 * constructing MultiResponse to save a possible loop if caller doesn't need MultiResponse. 1112 * @return an array of OperationStatus which internally contains the OperationStatusCode and the 1113 * exceptionMessage if any 1114 * @deprecated Since 3.0.0, will be removed in 4.0.0. We do not use this method for replaying 1115 * edits for secondary replicas any more, see 1116 * {@link #replicateToReplica(RpcController, ReplicateWALEntryRequest)}. 1117 */ 1118 @Deprecated 1119 private OperationStatus[] doReplayBatchOp(final HRegion region, 1120 final List<MutationReplay> mutations, long replaySeqId) throws IOException { 1121 long before = EnvironmentEdgeManager.currentTime(); 1122 boolean batchContainsPuts = false, batchContainsDelete = false; 1123 try { 1124 for (Iterator<MutationReplay> it = mutations.iterator(); it.hasNext();) { 1125 MutationReplay m = it.next(); 1126 1127 if (m.getType() == MutationType.PUT) { 1128 batchContainsPuts = true; 1129 } else { 1130 batchContainsDelete = true; 1131 } 1132 1133 NavigableMap<byte[], List<Cell>> map = m.mutation.getFamilyCellMap(); 1134 List<Cell> metaCells = map.get(WALEdit.METAFAMILY); 1135 if (metaCells != null && !metaCells.isEmpty()) { 1136 for (Cell metaCell : metaCells) { 1137 CompactionDescriptor compactionDesc = WALEdit.getCompaction(metaCell); 1138 boolean isDefaultReplica = RegionReplicaUtil.isDefaultReplica(region.getRegionInfo()); 1139 HRegion hRegion = region; 1140 if (compactionDesc != null) { 1141 // replay the compaction. Remove the files from stores only if we are the primary 1142 // region replica (thus own the files) 1143 hRegion.replayWALCompactionMarker(compactionDesc, !isDefaultReplica, isDefaultReplica, 1144 replaySeqId); 1145 continue; 1146 } 1147 FlushDescriptor flushDesc = WALEdit.getFlushDescriptor(metaCell); 1148 if (flushDesc != null && !isDefaultReplica) { 1149 hRegion.replayWALFlushMarker(flushDesc, replaySeqId); 1150 continue; 1151 } 1152 RegionEventDescriptor regionEvent = WALEdit.getRegionEventDescriptor(metaCell); 1153 if (regionEvent != null && !isDefaultReplica) { 1154 hRegion.replayWALRegionEventMarker(regionEvent); 1155 continue; 1156 } 1157 BulkLoadDescriptor bulkLoadEvent = WALEdit.getBulkLoadDescriptor(metaCell); 1158 if (bulkLoadEvent != null) { 1159 hRegion.replayWALBulkLoadEventMarker(bulkLoadEvent); 1160 continue; 1161 } 1162 } 1163 it.remove(); 1164 } 1165 } 1166 requestCount.increment(); 1167 if (!region.getRegionInfo().isMetaRegion()) { 1168 server.getMemStoreFlusher().reclaimMemStoreMemory(); 1169 } 1170 return region.batchReplay(mutations.toArray(new MutationReplay[mutations.size()]), 1171 replaySeqId); 1172 } finally { 1173 updateMutationMetrics(region, before, batchContainsPuts, batchContainsDelete); 1174 } 1175 } 1176 1177 private void closeAllScanners() { 1178 // Close any outstanding scanners. Means they'll get an UnknownScanner 1179 // exception next time they come in. 1180 for (Map.Entry<String, RegionScannerHolder> e : scanners.entrySet()) { 1181 try { 1182 e.getValue().s.close(); 1183 } catch (IOException ioe) { 1184 LOG.warn("Closing scanner " + e.getKey(), ioe); 1185 } 1186 } 1187 } 1188 1189 // Directly invoked only for testing 1190 public RSRpcServices(final HRegionServer rs) throws IOException { 1191 super(rs, rs.getProcessName()); 1192 final Configuration conf = rs.getConfiguration(); 1193 setReloadableGuardrails(conf); 1194 scannerLeaseTimeoutPeriod = conf.getInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, 1195 HConstants.DEFAULT_HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD); 1196 rpcTimeout = 1197 conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY, HConstants.DEFAULT_HBASE_RPC_TIMEOUT); 1198 minimumScanTimeLimitDelta = conf.getLong(REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA, 1199 DEFAULT_REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA); 1200 rpcServer.setNamedQueueRecorder(rs.getNamedQueueRecorder()); 1201 closedScanners = CacheBuilder.newBuilder() 1202 .expireAfterAccess(scannerLeaseTimeoutPeriod, TimeUnit.MILLISECONDS).build(); 1203 } 1204 1205 @Override 1206 protected boolean defaultReservoirEnabled() { 1207 return true; 1208 } 1209 1210 @Override 1211 protected ServerType getDNSServerType() { 1212 return DNS.ServerType.REGIONSERVER; 1213 } 1214 1215 @Override 1216 protected String getHostname(Configuration conf, String defaultHostname) { 1217 return conf.get("hbase.regionserver.ipc.address", defaultHostname); 1218 } 1219 1220 @Override 1221 protected String getPortConfigName() { 1222 return HConstants.REGIONSERVER_PORT; 1223 } 1224 1225 @Override 1226 protected int getDefaultPort() { 1227 return HConstants.DEFAULT_REGIONSERVER_PORT; 1228 } 1229 1230 @Override 1231 protected Class<?> getRpcSchedulerFactoryClass(Configuration conf) { 1232 return conf.getClass(REGION_SERVER_RPC_SCHEDULER_FACTORY_CLASS, 1233 SimpleRpcSchedulerFactory.class); 1234 } 1235 1236 protected RpcServerInterface createRpcServer(final Server server, 1237 final RpcSchedulerFactory rpcSchedulerFactory, final InetSocketAddress bindAddress, 1238 final String name) throws IOException { 1239 final Configuration conf = server.getConfiguration(); 1240 boolean reservoirEnabled = conf.getBoolean(ByteBuffAllocator.ALLOCATOR_POOL_ENABLED_KEY, true); 1241 try { 1242 return RpcServerFactory.createRpcServer(server, name, getServices(), bindAddress, // use final 1243 // bindAddress 1244 // for this 1245 // server. 1246 conf, rpcSchedulerFactory.create(conf, this, server), reservoirEnabled); 1247 } catch (BindException be) { 1248 throw new IOException(be.getMessage() + ". To switch ports use the '" 1249 + HConstants.REGIONSERVER_PORT + "' configuration property.", 1250 be.getCause() != null ? be.getCause() : be); 1251 } 1252 } 1253 1254 protected Class<?> getRpcSchedulerFactoryClass() { 1255 final Configuration conf = server.getConfiguration(); 1256 return conf.getClass(REGION_SERVER_RPC_SCHEDULER_FACTORY_CLASS, 1257 SimpleRpcSchedulerFactory.class); 1258 } 1259 1260 protected PriorityFunction createPriority() { 1261 return new RSAnnotationReadingPriorityFunction(this); 1262 } 1263 1264 public int getScannersCount() { 1265 return scanners.size(); 1266 } 1267 1268 /** Returns The outstanding RegionScanner for <code>scannerId</code> or null if none found. */ 1269 RegionScanner getScanner(long scannerId) { 1270 RegionScannerHolder rsh = getRegionScannerHolder(scannerId); 1271 return rsh == null ? null : rsh.s; 1272 } 1273 1274 /** Returns The associated RegionScannerHolder for <code>scannerId</code> or null. */ 1275 private RegionScannerHolder getRegionScannerHolder(long scannerId) { 1276 return scanners.get(toScannerName(scannerId)); 1277 } 1278 1279 public String getScanDetailsWithId(long scannerId) { 1280 RegionScanner scanner = getScanner(scannerId); 1281 if (scanner == null) { 1282 return null; 1283 } 1284 StringBuilder builder = new StringBuilder(); 1285 builder.append("table: ").append(scanner.getRegionInfo().getTable().getNameAsString()); 1286 builder.append(" region: ").append(scanner.getRegionInfo().getRegionNameAsString()); 1287 builder.append(" operation_id: ").append(scanner.getOperationId()); 1288 return builder.toString(); 1289 } 1290 1291 public String getScanDetailsWithRequest(ScanRequest request) { 1292 try { 1293 if (!request.hasRegion()) { 1294 return null; 1295 } 1296 Region region = getRegion(request.getRegion()); 1297 StringBuilder builder = new StringBuilder(); 1298 builder.append("table: ").append(region.getRegionInfo().getTable().getNameAsString()); 1299 builder.append(" region: ").append(region.getRegionInfo().getRegionNameAsString()); 1300 for (NameBytesPair pair : request.getScan().getAttributeList()) { 1301 if (OperationWithAttributes.ID_ATRIBUTE.equals(pair.getName())) { 1302 builder.append(" operation_id: ").append(Bytes.toString(pair.getValue().toByteArray())); 1303 break; 1304 } 1305 } 1306 return builder.toString(); 1307 } catch (IOException ignored) { 1308 return null; 1309 } 1310 } 1311 1312 /** 1313 * Get the vtime associated with the scanner. Currently the vtime is the number of "next" calls. 1314 */ 1315 long getScannerVirtualTime(long scannerId) { 1316 RegionScannerHolder rsh = getRegionScannerHolder(scannerId); 1317 return rsh == null ? 0L : rsh.getNextCallSeq(); 1318 } 1319 1320 /** 1321 * Method to account for the size of retained cells. 1322 * @param context rpc call context 1323 * @param r result to add size. 1324 * @return an object that represents the last referenced block from this response. 1325 */ 1326 void addSize(RpcCallContext context, Result r) { 1327 if (context != null && r != null && !r.isEmpty()) { 1328 for (Cell c : r.rawCells()) { 1329 context.incrementResponseCellSize(PrivateCellUtil.estimatedSerializedSizeOf(c)); 1330 } 1331 } 1332 } 1333 1334 /** Returns Remote client's ip and port else null if can't be determined. */ 1335 @RestrictedApi(explanation = "Should only be called in TestRSRpcServices and RSRpcServices", 1336 link = "", allowedOnPath = ".*(TestRSRpcServices|RSRpcServices).java") 1337 static String getRemoteClientIpAndPort() { 1338 RpcCall rpcCall = RpcServer.getCurrentCall().orElse(null); 1339 if (rpcCall == null) { 1340 return HConstants.EMPTY_STRING; 1341 } 1342 InetAddress address = rpcCall.getRemoteAddress(); 1343 if (address == null) { 1344 return HConstants.EMPTY_STRING; 1345 } 1346 // Be careful here with InetAddress. Do InetAddress#getHostAddress. It will not do a name 1347 // resolution. Just use the IP. It is generally a smaller amount of info to keep around while 1348 // scanning than a hostname anyways. 1349 return Address.fromParts(address.getHostAddress(), rpcCall.getRemotePort()).toString(); 1350 } 1351 1352 /** Returns Remote client's username. */ 1353 @RestrictedApi(explanation = "Should only be called in TestRSRpcServices and RSRpcServices", 1354 link = "", allowedOnPath = ".*(TestRSRpcServices|RSRpcServices).java") 1355 static String getUserName() { 1356 RpcCall rpcCall = RpcServer.getCurrentCall().orElse(null); 1357 if (rpcCall == null) { 1358 return HConstants.EMPTY_STRING; 1359 } 1360 return rpcCall.getRequestUserName().orElse(HConstants.EMPTY_STRING); 1361 } 1362 1363 private RegionScannerHolder addScanner(String scannerName, RegionScanner s, Shipper shipper, 1364 HRegion r, boolean needCursor, boolean fullRegionScan) throws LeaseStillHeldException { 1365 Lease lease = server.getLeaseManager().createLease(scannerName, this.scannerLeaseTimeoutPeriod, 1366 new ScannerListener(scannerName)); 1367 RpcCallback shippedCallback = new RegionScannerShippedCallBack(scannerName, shipper, lease); 1368 RpcCallback closeCallback = 1369 s instanceof RpcCallback ? (RpcCallback) s : new RegionScannerCloseCallBack(s); 1370 RegionScannerHolder rsh = new RegionScannerHolder(s, r, closeCallback, shippedCallback, 1371 needCursor, fullRegionScan, getRemoteClientIpAndPort(), getUserName()); 1372 RegionScannerHolder existing = scanners.putIfAbsent(scannerName, rsh); 1373 assert existing == null : "scannerId must be unique within regionserver's whole lifecycle! " 1374 + scannerName + ", " + existing; 1375 return rsh; 1376 } 1377 1378 private boolean isFullRegionScan(Scan scan, HRegion region) { 1379 // If the scan start row equals or less than the start key of the region 1380 // and stop row greater than equals end key (if stop row present) 1381 // or if the stop row is empty 1382 // account this as a full region scan 1383 if ( 1384 Bytes.compareTo(scan.getStartRow(), region.getRegionInfo().getStartKey()) <= 0 1385 && (Bytes.compareTo(scan.getStopRow(), region.getRegionInfo().getEndKey()) >= 0 1386 && !Bytes.equals(region.getRegionInfo().getEndKey(), HConstants.EMPTY_END_ROW) 1387 || Bytes.equals(scan.getStopRow(), HConstants.EMPTY_END_ROW)) 1388 ) { 1389 return true; 1390 } 1391 return false; 1392 } 1393 1394 /** 1395 * Find the HRegion based on a region specifier 1396 * @param regionSpecifier the region specifier 1397 * @return the corresponding region 1398 * @throws IOException if the specifier is not null, but failed to find the region 1399 */ 1400 public HRegion getRegion(final RegionSpecifier regionSpecifier) throws IOException { 1401 return server.getRegion(regionSpecifier.getValue().toByteArray()); 1402 } 1403 1404 /** 1405 * Find the List of HRegions based on a list of region specifiers 1406 * @param regionSpecifiers the list of region specifiers 1407 * @return the corresponding list of regions 1408 * @throws IOException if any of the specifiers is not null, but failed to find the region 1409 */ 1410 private List<HRegion> getRegions(final List<RegionSpecifier> regionSpecifiers, 1411 final CacheEvictionStatsBuilder stats) { 1412 List<HRegion> regions = Lists.newArrayListWithCapacity(regionSpecifiers.size()); 1413 for (RegionSpecifier regionSpecifier : regionSpecifiers) { 1414 try { 1415 regions.add(server.getRegion(regionSpecifier.getValue().toByteArray())); 1416 } catch (NotServingRegionException e) { 1417 stats.addException(regionSpecifier.getValue().toByteArray(), e); 1418 } 1419 } 1420 return regions; 1421 } 1422 1423 PriorityFunction getPriority() { 1424 return priority; 1425 } 1426 1427 private RegionServerRpcQuotaManager getRpcQuotaManager() { 1428 return server.getRegionServerRpcQuotaManager(); 1429 } 1430 1431 private RegionServerSpaceQuotaManager getSpaceQuotaManager() { 1432 return server.getRegionServerSpaceQuotaManager(); 1433 } 1434 1435 void start(ZKWatcher zkWatcher) { 1436 this.scannerIdGenerator = new ScannerIdGenerator(this.server.getServerName()); 1437 internalStart(zkWatcher); 1438 } 1439 1440 void stop() { 1441 closeAllScanners(); 1442 internalStop(); 1443 } 1444 1445 /** 1446 * Called to verify that this server is up and running. 1447 */ 1448 // TODO : Rename this and HMaster#checkInitialized to isRunning() (or a better name). 1449 protected void checkOpen() throws IOException { 1450 if (server.isAborted()) { 1451 throw new RegionServerAbortedException("Server " + server.getServerName() + " aborting"); 1452 } 1453 if (server.isStopped()) { 1454 throw new RegionServerStoppedException("Server " + server.getServerName() + " stopping"); 1455 } 1456 if (!server.isDataFileSystemOk()) { 1457 throw new RegionServerStoppedException("File system not available"); 1458 } 1459 if (!server.isOnline()) { 1460 throw new ServerNotRunningYetException( 1461 "Server " + server.getServerName() + " is not running yet"); 1462 } 1463 } 1464 1465 /** 1466 * By default, put up an Admin and a Client Service. Set booleans 1467 * <code>hbase.regionserver.admin.executorService</code> and 1468 * <code>hbase.regionserver.client.executorService</code> if you want to enable/disable services. 1469 * Default is that both are enabled. 1470 * @return immutable list of blocking services and the security info classes that this server 1471 * supports 1472 */ 1473 protected List<BlockingServiceAndInterface> getServices() { 1474 boolean admin = getConfiguration().getBoolean(REGIONSERVER_ADMIN_SERVICE_CONFIG, true); 1475 boolean client = getConfiguration().getBoolean(REGIONSERVER_CLIENT_SERVICE_CONFIG, true); 1476 boolean clientMeta = 1477 getConfiguration().getBoolean(REGIONSERVER_CLIENT_META_SERVICE_CONFIG, true); 1478 boolean bootstrapNodes = 1479 getConfiguration().getBoolean(REGIONSERVER_BOOTSTRAP_NODES_SERVICE_CONFIG, true); 1480 List<BlockingServiceAndInterface> bssi = new ArrayList<>(); 1481 if (client) { 1482 bssi.add(new BlockingServiceAndInterface(ClientService.newReflectiveBlockingService(this), 1483 ClientService.BlockingInterface.class)); 1484 } 1485 if (admin) { 1486 bssi.add(new BlockingServiceAndInterface(AdminService.newReflectiveBlockingService(this), 1487 AdminService.BlockingInterface.class)); 1488 } 1489 if (clientMeta) { 1490 bssi.add(new BlockingServiceAndInterface(ClientMetaService.newReflectiveBlockingService(this), 1491 ClientMetaService.BlockingInterface.class)); 1492 } 1493 if (bootstrapNodes) { 1494 bssi.add( 1495 new BlockingServiceAndInterface(BootstrapNodeService.newReflectiveBlockingService(this), 1496 BootstrapNodeService.BlockingInterface.class)); 1497 } 1498 return new ImmutableList.Builder<BlockingServiceAndInterface>().addAll(bssi).build(); 1499 } 1500 1501 /** 1502 * Close a region on the region server. 1503 * @param controller the RPC controller 1504 * @param request the request 1505 */ 1506 @Override 1507 @QosPriority(priority = HConstants.ADMIN_QOS) 1508 public CloseRegionResponse closeRegion(final RpcController controller, 1509 final CloseRegionRequest request) throws ServiceException { 1510 final ServerName sn = (request.hasDestinationServer() 1511 ? ProtobufUtil.toServerName(request.getDestinationServer()) 1512 : null); 1513 1514 try { 1515 checkOpen(); 1516 throwOnWrongStartCode(request); 1517 final String encodedRegionName = ProtobufUtil.getRegionEncodedName(request.getRegion()); 1518 1519 requestCount.increment(); 1520 if (sn == null) { 1521 LOG.info("Close " + encodedRegionName + " without moving"); 1522 } else { 1523 LOG.info("Close " + encodedRegionName + ", moving to " + sn); 1524 } 1525 boolean closed = server.closeRegion(encodedRegionName, false, sn); 1526 CloseRegionResponse.Builder builder = CloseRegionResponse.newBuilder().setClosed(closed); 1527 return builder.build(); 1528 } catch (IOException ie) { 1529 throw new ServiceException(ie); 1530 } 1531 } 1532 1533 /** 1534 * Compact a region on the region server. 1535 * @param controller the RPC controller 1536 * @param request the request 1537 */ 1538 @Override 1539 @QosPriority(priority = HConstants.ADMIN_QOS) 1540 public CompactRegionResponse compactRegion(final RpcController controller, 1541 final CompactRegionRequest request) throws ServiceException { 1542 try { 1543 checkOpen(); 1544 requestCount.increment(); 1545 HRegion region = getRegion(request.getRegion()); 1546 // Quota support is enabled, the requesting user is not system/super user 1547 // and a quota policy is enforced that disables compactions. 1548 if ( 1549 QuotaUtil.isQuotaEnabled(getConfiguration()) 1550 && !Superusers.isSuperUser(RpcServer.getRequestUser().orElse(null)) 1551 && this.server.getRegionServerSpaceQuotaManager() 1552 .areCompactionsDisabled(region.getTableDescriptor().getTableName()) 1553 ) { 1554 throw new DoNotRetryIOException( 1555 "Compactions on this region are " + "disabled due to a space quota violation."); 1556 } 1557 region.startRegionOperation(Operation.COMPACT_REGION); 1558 LOG.info("Compacting " + region.getRegionInfo().getRegionNameAsString()); 1559 boolean major = request.hasMajor() && request.getMajor(); 1560 if (request.hasFamily()) { 1561 byte[] family = request.getFamily().toByteArray(); 1562 String log = "User-triggered " + (major ? "major " : "") + "compaction for region " 1563 + region.getRegionInfo().getRegionNameAsString() + " and family " 1564 + Bytes.toString(family); 1565 LOG.trace(log); 1566 region.requestCompaction(family, log, Store.PRIORITY_USER, major, 1567 CompactionLifeCycleTracker.DUMMY); 1568 } else { 1569 String log = "User-triggered " + (major ? "major " : "") + "compaction for region " 1570 + region.getRegionInfo().getRegionNameAsString(); 1571 LOG.trace(log); 1572 region.requestCompaction(log, Store.PRIORITY_USER, major, CompactionLifeCycleTracker.DUMMY); 1573 } 1574 return CompactRegionResponse.newBuilder().build(); 1575 } catch (IOException ie) { 1576 throw new ServiceException(ie); 1577 } 1578 } 1579 1580 @Override 1581 @QosPriority(priority = HConstants.ADMIN_QOS) 1582 public CompactionSwitchResponse compactionSwitch(RpcController controller, 1583 CompactionSwitchRequest request) throws ServiceException { 1584 rpcPreCheck("compactionSwitch"); 1585 final CompactSplit compactSplitThread = server.getCompactSplitThread(); 1586 requestCount.increment(); 1587 boolean prevState = compactSplitThread.isCompactionsEnabled(); 1588 CompactionSwitchResponse response = 1589 CompactionSwitchResponse.newBuilder().setPrevState(prevState).build(); 1590 if (prevState == request.getEnabled()) { 1591 // passed in requested state is same as current state. No action required 1592 return response; 1593 } 1594 compactSplitThread.switchCompaction(request.getEnabled()); 1595 return response; 1596 } 1597 1598 /** 1599 * Flush a region on the region server. 1600 * @param controller the RPC controller 1601 * @param request the request 1602 */ 1603 @Override 1604 @QosPriority(priority = HConstants.ADMIN_QOS) 1605 public FlushRegionResponse flushRegion(final RpcController controller, 1606 final FlushRegionRequest request) throws ServiceException { 1607 try { 1608 checkOpen(); 1609 requestCount.increment(); 1610 HRegion region = getRegion(request.getRegion()); 1611 LOG.info("Flushing " + region.getRegionInfo().getRegionNameAsString()); 1612 boolean shouldFlush = true; 1613 if (request.hasIfOlderThanTs()) { 1614 shouldFlush = region.getEarliestFlushTimeForAllStores() < request.getIfOlderThanTs(); 1615 } 1616 FlushRegionResponse.Builder builder = FlushRegionResponse.newBuilder(); 1617 if (shouldFlush) { 1618 boolean writeFlushWalMarker = 1619 request.hasWriteFlushWalMarker() ? request.getWriteFlushWalMarker() : false; 1620 // Go behind the curtain so we can manage writing of the flush WAL marker 1621 HRegion.FlushResultImpl flushResult = null; 1622 if (request.hasFamily()) { 1623 List families = new ArrayList(); 1624 families.add(request.getFamily().toByteArray()); 1625 flushResult = 1626 region.flushcache(families, writeFlushWalMarker, FlushLifeCycleTracker.DUMMY); 1627 } else { 1628 flushResult = region.flushcache(true, writeFlushWalMarker, FlushLifeCycleTracker.DUMMY); 1629 } 1630 boolean compactionNeeded = flushResult.isCompactionNeeded(); 1631 if (compactionNeeded) { 1632 server.getCompactSplitThread().requestSystemCompaction(region, 1633 "Compaction through user triggered flush"); 1634 } 1635 builder.setFlushed(flushResult.isFlushSucceeded()); 1636 builder.setWroteFlushWalMarker(flushResult.wroteFlushWalMarker); 1637 } 1638 builder.setLastFlushTime(region.getEarliestFlushTimeForAllStores()); 1639 return builder.build(); 1640 } catch (DroppedSnapshotException ex) { 1641 // Cache flush can fail in a few places. If it fails in a critical 1642 // section, we get a DroppedSnapshotException and a replay of wal 1643 // is required. Currently the only way to do this is a restart of 1644 // the server. 1645 server.abort("Replay of WAL required. Forcing server shutdown", ex); 1646 throw new ServiceException(ex); 1647 } catch (IOException ie) { 1648 throw new ServiceException(ie); 1649 } 1650 } 1651 1652 @Override 1653 @QosPriority(priority = HConstants.ADMIN_QOS) 1654 public GetOnlineRegionResponse getOnlineRegion(final RpcController controller, 1655 final GetOnlineRegionRequest request) throws ServiceException { 1656 try { 1657 checkOpen(); 1658 requestCount.increment(); 1659 Map<String, HRegion> onlineRegions = server.getOnlineRegions(); 1660 List<RegionInfo> list = new ArrayList<>(onlineRegions.size()); 1661 for (HRegion region : onlineRegions.values()) { 1662 list.add(region.getRegionInfo()); 1663 } 1664 list.sort(RegionInfo.COMPARATOR); 1665 return ResponseConverter.buildGetOnlineRegionResponse(list); 1666 } catch (IOException ie) { 1667 throw new ServiceException(ie); 1668 } 1669 } 1670 1671 // Master implementation of this Admin Service differs given it is not 1672 // able to supply detail only known to RegionServer. See note on 1673 // MasterRpcServers#getRegionInfo. 1674 @Override 1675 @QosPriority(priority = HConstants.ADMIN_QOS) 1676 public GetRegionInfoResponse getRegionInfo(final RpcController controller, 1677 final GetRegionInfoRequest request) throws ServiceException { 1678 try { 1679 checkOpen(); 1680 requestCount.increment(); 1681 HRegion region = getRegion(request.getRegion()); 1682 RegionInfo info = region.getRegionInfo(); 1683 byte[] bestSplitRow; 1684 if (request.hasBestSplitRow() && request.getBestSplitRow()) { 1685 bestSplitRow = region.checkSplit(true).orElse(null); 1686 // when all table data are in memstore, bestSplitRow = null 1687 // try to flush region first 1688 if (bestSplitRow == null) { 1689 region.flush(true); 1690 bestSplitRow = region.checkSplit(true).orElse(null); 1691 } 1692 } else { 1693 bestSplitRow = null; 1694 } 1695 GetRegionInfoResponse.Builder builder = GetRegionInfoResponse.newBuilder(); 1696 builder.setRegionInfo(ProtobufUtil.toRegionInfo(info)); 1697 if (request.hasCompactionState() && request.getCompactionState()) { 1698 builder.setCompactionState(ProtobufUtil.createCompactionState(region.getCompactionState())); 1699 } 1700 builder.setSplittable(region.isSplittable()); 1701 builder.setMergeable(region.isMergeable()); 1702 if (request.hasBestSplitRow() && request.getBestSplitRow() && bestSplitRow != null) { 1703 builder.setBestSplitRow(UnsafeByteOperations.unsafeWrap(bestSplitRow)); 1704 } 1705 return builder.build(); 1706 } catch (IOException ie) { 1707 throw new ServiceException(ie); 1708 } 1709 } 1710 1711 @Override 1712 @QosPriority(priority = HConstants.ADMIN_QOS) 1713 public GetRegionLoadResponse getRegionLoad(RpcController controller, GetRegionLoadRequest request) 1714 throws ServiceException { 1715 1716 List<HRegion> regions; 1717 if (request.hasTableName()) { 1718 TableName tableName = ProtobufUtil.toTableName(request.getTableName()); 1719 regions = server.getRegions(tableName); 1720 } else { 1721 regions = server.getRegions(); 1722 } 1723 List<RegionLoad> rLoads = new ArrayList<>(regions.size()); 1724 RegionLoad.Builder regionLoadBuilder = ClusterStatusProtos.RegionLoad.newBuilder(); 1725 RegionSpecifier.Builder regionSpecifier = RegionSpecifier.newBuilder(); 1726 1727 try { 1728 for (HRegion region : regions) { 1729 rLoads.add(server.createRegionLoad(region, regionLoadBuilder, regionSpecifier)); 1730 } 1731 } catch (IOException e) { 1732 throw new ServiceException(e); 1733 } 1734 GetRegionLoadResponse.Builder builder = GetRegionLoadResponse.newBuilder(); 1735 builder.addAllRegionLoads(rLoads); 1736 return builder.build(); 1737 } 1738 1739 @Override 1740 @QosPriority(priority = HConstants.ADMIN_QOS) 1741 public ClearCompactionQueuesResponse clearCompactionQueues(RpcController controller, 1742 ClearCompactionQueuesRequest request) throws ServiceException { 1743 LOG.debug("Client=" + RpcServer.getRequestUserName().orElse(null) + "/" 1744 + RpcServer.getRemoteAddress().orElse(null) + " clear compactions queue"); 1745 ClearCompactionQueuesResponse.Builder respBuilder = ClearCompactionQueuesResponse.newBuilder(); 1746 requestCount.increment(); 1747 if (clearCompactionQueues.compareAndSet(false, true)) { 1748 final CompactSplit compactSplitThread = server.getCompactSplitThread(); 1749 try { 1750 checkOpen(); 1751 server.getRegionServerCoprocessorHost().preClearCompactionQueues(); 1752 for (String queueName : request.getQueueNameList()) { 1753 LOG.debug("clear " + queueName + " compaction queue"); 1754 switch (queueName) { 1755 case "long": 1756 compactSplitThread.clearLongCompactionsQueue(); 1757 break; 1758 case "short": 1759 compactSplitThread.clearShortCompactionsQueue(); 1760 break; 1761 default: 1762 LOG.warn("Unknown queue name " + queueName); 1763 throw new IOException("Unknown queue name " + queueName); 1764 } 1765 } 1766 server.getRegionServerCoprocessorHost().postClearCompactionQueues(); 1767 } catch (IOException ie) { 1768 throw new ServiceException(ie); 1769 } finally { 1770 clearCompactionQueues.set(false); 1771 } 1772 } else { 1773 LOG.warn("Clear compactions queue is executing by other admin."); 1774 } 1775 return respBuilder.build(); 1776 } 1777 1778 /** 1779 * Get some information of the region server. 1780 * @param controller the RPC controller 1781 * @param request the request 1782 */ 1783 @Override 1784 @QosPriority(priority = HConstants.ADMIN_QOS) 1785 public GetServerInfoResponse getServerInfo(final RpcController controller, 1786 final GetServerInfoRequest request) throws ServiceException { 1787 try { 1788 checkOpen(); 1789 } catch (IOException ie) { 1790 throw new ServiceException(ie); 1791 } 1792 requestCount.increment(); 1793 int infoPort = server.getInfoServer() != null ? server.getInfoServer().getPort() : -1; 1794 return ResponseConverter.buildGetServerInfoResponse(server.getServerName(), infoPort); 1795 } 1796 1797 @Override 1798 @QosPriority(priority = HConstants.ADMIN_QOS) 1799 public GetStoreFileResponse getStoreFile(final RpcController controller, 1800 final GetStoreFileRequest request) throws ServiceException { 1801 try { 1802 checkOpen(); 1803 HRegion region = getRegion(request.getRegion()); 1804 requestCount.increment(); 1805 Set<byte[]> columnFamilies; 1806 if (request.getFamilyCount() == 0) { 1807 columnFamilies = region.getTableDescriptor().getColumnFamilyNames(); 1808 } else { 1809 columnFamilies = new TreeSet<>(Bytes.BYTES_RAWCOMPARATOR); 1810 for (ByteString cf : request.getFamilyList()) { 1811 columnFamilies.add(cf.toByteArray()); 1812 } 1813 } 1814 int nCF = columnFamilies.size(); 1815 List<String> fileList = region.getStoreFileList(columnFamilies.toArray(new byte[nCF][])); 1816 GetStoreFileResponse.Builder builder = GetStoreFileResponse.newBuilder(); 1817 builder.addAllStoreFile(fileList); 1818 return builder.build(); 1819 } catch (IOException ie) { 1820 throw new ServiceException(ie); 1821 } 1822 } 1823 1824 private void throwOnWrongStartCode(OpenRegionRequest request) throws ServiceException { 1825 if (!request.hasServerStartCode()) { 1826 LOG.warn("OpenRegionRequest for {} does not have a start code", request.getOpenInfoList()); 1827 return; 1828 } 1829 throwOnWrongStartCode(request.getServerStartCode()); 1830 } 1831 1832 private void throwOnWrongStartCode(CloseRegionRequest request) throws ServiceException { 1833 if (!request.hasServerStartCode()) { 1834 LOG.warn("CloseRegionRequest for {} does not have a start code", request.getRegion()); 1835 return; 1836 } 1837 throwOnWrongStartCode(request.getServerStartCode()); 1838 } 1839 1840 private void throwOnWrongStartCode(long serverStartCode) throws ServiceException { 1841 // check that we are the same server that this RPC is intended for. 1842 if (server.getServerName().getStartcode() != serverStartCode) { 1843 throw new ServiceException(new DoNotRetryIOException( 1844 "This RPC was intended for a " + "different server with startCode: " + serverStartCode 1845 + ", this server is: " + server.getServerName())); 1846 } 1847 } 1848 1849 private void throwOnWrongStartCode(ExecuteProceduresRequest req) throws ServiceException { 1850 if (req.getOpenRegionCount() > 0) { 1851 for (OpenRegionRequest openReq : req.getOpenRegionList()) { 1852 throwOnWrongStartCode(openReq); 1853 } 1854 } 1855 if (req.getCloseRegionCount() > 0) { 1856 for (CloseRegionRequest closeReq : req.getCloseRegionList()) { 1857 throwOnWrongStartCode(closeReq); 1858 } 1859 } 1860 } 1861 1862 /** 1863 * Open asynchronously a region or a set of regions on the region server. The opening is 1864 * coordinated by ZooKeeper, and this method requires the znode to be created before being called. 1865 * As a consequence, this method should be called only from the master. 1866 * <p> 1867 * Different manages states for the region are: 1868 * </p> 1869 * <ul> 1870 * <li>region not opened: the region opening will start asynchronously.</li> 1871 * <li>a close is already in progress: this is considered as an error.</li> 1872 * <li>an open is already in progress: this new open request will be ignored. This is important 1873 * because the Master can do multiple requests if it crashes.</li> 1874 * <li>the region is already opened: this new open request will be ignored.</li> 1875 * </ul> 1876 * <p> 1877 * Bulk assign: If there are more than 1 region to open, it will be considered as a bulk assign. 1878 * For a single region opening, errors are sent through a ServiceException. For bulk assign, 1879 * errors are put in the response as FAILED_OPENING. 1880 * </p> 1881 * @param controller the RPC controller 1882 * @param request the request 1883 */ 1884 @Override 1885 @QosPriority(priority = HConstants.ADMIN_QOS) 1886 public OpenRegionResponse openRegion(final RpcController controller, 1887 final OpenRegionRequest request) throws ServiceException { 1888 requestCount.increment(); 1889 throwOnWrongStartCode(request); 1890 1891 OpenRegionResponse.Builder builder = OpenRegionResponse.newBuilder(); 1892 final int regionCount = request.getOpenInfoCount(); 1893 final Map<TableName, TableDescriptor> htds = new HashMap<>(regionCount); 1894 final boolean isBulkAssign = regionCount > 1; 1895 try { 1896 checkOpen(); 1897 } catch (IOException ie) { 1898 TableName tableName = null; 1899 if (regionCount == 1) { 1900 org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionInfo ri = 1901 request.getOpenInfo(0).getRegion(); 1902 if (ri != null) { 1903 tableName = ProtobufUtil.toTableName(ri.getTableName()); 1904 } 1905 } 1906 if (!TableName.META_TABLE_NAME.equals(tableName)) { 1907 throw new ServiceException(ie); 1908 } 1909 // We are assigning meta, wait a little for regionserver to finish initialization. 1910 // Default to quarter of RPC timeout 1911 int timeout = server.getConfiguration().getInt(HConstants.HBASE_RPC_TIMEOUT_KEY, 1912 HConstants.DEFAULT_HBASE_RPC_TIMEOUT) >> 2; 1913 long endTime = EnvironmentEdgeManager.currentTime() + timeout; 1914 synchronized (server.online) { 1915 try { 1916 while ( 1917 EnvironmentEdgeManager.currentTime() <= endTime && !server.isStopped() 1918 && !server.isOnline() 1919 ) { 1920 server.online.wait(server.getMsgInterval()); 1921 } 1922 checkOpen(); 1923 } catch (InterruptedException t) { 1924 Thread.currentThread().interrupt(); 1925 throw new ServiceException(t); 1926 } catch (IOException e) { 1927 throw new ServiceException(e); 1928 } 1929 } 1930 } 1931 1932 long masterSystemTime = request.hasMasterSystemTime() ? request.getMasterSystemTime() : -1; 1933 1934 for (RegionOpenInfo regionOpenInfo : request.getOpenInfoList()) { 1935 final RegionInfo region = ProtobufUtil.toRegionInfo(regionOpenInfo.getRegion()); 1936 TableDescriptor htd; 1937 try { 1938 String encodedName = region.getEncodedName(); 1939 byte[] encodedNameBytes = region.getEncodedNameAsBytes(); 1940 final HRegion onlineRegion = server.getRegion(encodedName); 1941 if (onlineRegion != null) { 1942 // The region is already online. This should not happen any more. 1943 String error = "Received OPEN for the region:" + region.getRegionNameAsString() 1944 + ", which is already online"; 1945 LOG.warn(error); 1946 // server.abort(error); 1947 // throw new IOException(error); 1948 builder.addOpeningState(RegionOpeningState.OPENED); 1949 continue; 1950 } 1951 LOG.info("Open " + region.getRegionNameAsString()); 1952 1953 final Boolean previous = 1954 server.getRegionsInTransitionInRS().putIfAbsent(encodedNameBytes, Boolean.TRUE); 1955 1956 if (Boolean.FALSE.equals(previous)) { 1957 if (server.getRegion(encodedName) != null) { 1958 // There is a close in progress. This should not happen any more. 1959 String error = "Received OPEN for the region:" + region.getRegionNameAsString() 1960 + ", which we are already trying to CLOSE"; 1961 server.abort(error); 1962 throw new IOException(error); 1963 } 1964 server.getRegionsInTransitionInRS().put(encodedNameBytes, Boolean.TRUE); 1965 } 1966 1967 if (Boolean.TRUE.equals(previous)) { 1968 // An open is in progress. This is supported, but let's log this. 1969 LOG.info("Receiving OPEN for the region:" + region.getRegionNameAsString() 1970 + ", which we are already trying to OPEN" 1971 + " - ignoring this new request for this region."); 1972 } 1973 1974 // We are opening this region. If it moves back and forth for whatever reason, we don't 1975 // want to keep returning the stale moved record while we are opening/if we close again. 1976 server.removeFromMovedRegions(region.getEncodedName()); 1977 1978 if (previous == null || !previous.booleanValue()) { 1979 htd = htds.get(region.getTable()); 1980 if (htd == null) { 1981 htd = server.getTableDescriptors().get(region.getTable()); 1982 htds.put(region.getTable(), htd); 1983 } 1984 if (htd == null) { 1985 throw new IOException("Missing table descriptor for " + region.getEncodedName()); 1986 } 1987 // If there is no action in progress, we can submit a specific handler. 1988 // Need to pass the expected version in the constructor. 1989 if (server.getExecutorService() == null) { 1990 LOG.info("No executor executorService; skipping open request"); 1991 } else { 1992 if (region.isMetaRegion()) { 1993 server.getExecutorService() 1994 .submit(new OpenMetaHandler(server, server, region, htd, masterSystemTime)); 1995 } else { 1996 if (regionOpenInfo.getFavoredNodesCount() > 0) { 1997 server.updateRegionFavoredNodesMapping(region.getEncodedName(), 1998 regionOpenInfo.getFavoredNodesList()); 1999 } 2000 if (htd.getPriority() >= HConstants.ADMIN_QOS || region.getTable().isSystemTable()) { 2001 server.getExecutorService().submit( 2002 new OpenPriorityRegionHandler(server, server, region, htd, masterSystemTime)); 2003 } else { 2004 server.getExecutorService() 2005 .submit(new OpenRegionHandler(server, server, region, htd, masterSystemTime)); 2006 } 2007 } 2008 } 2009 } 2010 2011 builder.addOpeningState(RegionOpeningState.OPENED); 2012 } catch (IOException ie) { 2013 LOG.warn("Failed opening region " + region.getRegionNameAsString(), ie); 2014 if (isBulkAssign) { 2015 builder.addOpeningState(RegionOpeningState.FAILED_OPENING); 2016 } else { 2017 throw new ServiceException(ie); 2018 } 2019 } 2020 } 2021 return builder.build(); 2022 } 2023 2024 /** 2025 * Warmup a region on this server. This method should only be called by Master. It synchronously 2026 * opens the region and closes the region bringing the most important pages in cache. 2027 */ 2028 @Override 2029 public WarmupRegionResponse warmupRegion(final RpcController controller, 2030 final WarmupRegionRequest request) throws ServiceException { 2031 final RegionInfo region = ProtobufUtil.toRegionInfo(request.getRegionInfo()); 2032 WarmupRegionResponse response = WarmupRegionResponse.getDefaultInstance(); 2033 try { 2034 checkOpen(); 2035 String encodedName = region.getEncodedName(); 2036 byte[] encodedNameBytes = region.getEncodedNameAsBytes(); 2037 final HRegion onlineRegion = server.getRegion(encodedName); 2038 if (onlineRegion != null) { 2039 LOG.info("{} is online; skipping warmup", region); 2040 return response; 2041 } 2042 TableDescriptor htd = server.getTableDescriptors().get(region.getTable()); 2043 if (server.getRegionsInTransitionInRS().containsKey(encodedNameBytes)) { 2044 LOG.info("{} is in transition; skipping warmup", region); 2045 return response; 2046 } 2047 LOG.info("Warmup {}", region.getRegionNameAsString()); 2048 HRegion.warmupHRegion(region, htd, server.getWAL(region), server.getConfiguration(), server, 2049 null); 2050 } catch (IOException ie) { 2051 LOG.error("Failed warmup of {}", region.getRegionNameAsString(), ie); 2052 throw new ServiceException(ie); 2053 } 2054 2055 return response; 2056 } 2057 2058 private ExtendedCellScanner getAndReset(RpcController controller) { 2059 HBaseRpcController hrc = (HBaseRpcController) controller; 2060 ExtendedCellScanner cells = hrc.cellScanner(); 2061 hrc.setCellScanner(null); 2062 return cells; 2063 } 2064 2065 /** 2066 * Replay the given changes when distributedLogReplay WAL edits from a failed RS. The guarantee is 2067 * that the given mutations will be durable on the receiving RS if this method returns without any 2068 * exception. 2069 * @param controller the RPC controller 2070 * @param request the request 2071 * @deprecated Since 3.0.0, will be removed in 4.0.0. Not used any more, put here only for 2072 * compatibility with old region replica implementation. Now we will use 2073 * {@code replicateToReplica} method instead. 2074 */ 2075 @Deprecated 2076 @Override 2077 @QosPriority(priority = HConstants.REPLAY_QOS) 2078 public ReplicateWALEntryResponse replay(final RpcController controller, 2079 final ReplicateWALEntryRequest request) throws ServiceException { 2080 long before = EnvironmentEdgeManager.currentTime(); 2081 CellScanner cells = getAndReset(controller); 2082 try { 2083 checkOpen(); 2084 List<WALEntry> entries = request.getEntryList(); 2085 if (entries == null || entries.isEmpty()) { 2086 // empty input 2087 return ReplicateWALEntryResponse.newBuilder().build(); 2088 } 2089 ByteString regionName = entries.get(0).getKey().getEncodedRegionName(); 2090 HRegion region = server.getRegionByEncodedName(regionName.toStringUtf8()); 2091 RegionCoprocessorHost coprocessorHost = 2092 ServerRegionReplicaUtil.isDefaultReplica(region.getRegionInfo()) 2093 ? region.getCoprocessorHost() 2094 : null; // do not invoke coprocessors if this is a secondary region replica 2095 2096 // Skip adding the edits to WAL if this is a secondary region replica 2097 boolean isPrimary = RegionReplicaUtil.isDefaultReplica(region.getRegionInfo()); 2098 Durability durability = isPrimary ? Durability.USE_DEFAULT : Durability.SKIP_WAL; 2099 2100 for (WALEntry entry : entries) { 2101 if (!regionName.equals(entry.getKey().getEncodedRegionName())) { 2102 throw new NotServingRegionException("Replay request contains entries from multiple " 2103 + "regions. First region:" + regionName.toStringUtf8() + " , other region:" 2104 + entry.getKey().getEncodedRegionName()); 2105 } 2106 if (server.nonceManager != null && isPrimary) { 2107 long nonceGroup = 2108 entry.getKey().hasNonceGroup() ? entry.getKey().getNonceGroup() : HConstants.NO_NONCE; 2109 long nonce = entry.getKey().hasNonce() ? entry.getKey().getNonce() : HConstants.NO_NONCE; 2110 server.nonceManager.reportOperationFromWal(nonceGroup, nonce, 2111 entry.getKey().getWriteTime()); 2112 } 2113 Pair<WALKey, WALEdit> walEntry = (coprocessorHost == null) ? null : new Pair<>(); 2114 List<MutationReplay> edits = 2115 WALSplitUtil.getMutationsFromWALEntry(entry, cells, walEntry, durability); 2116 if (edits != null && !edits.isEmpty()) { 2117 // HBASE-17924 2118 // sort to improve lock efficiency 2119 Collections.sort(edits, (v1, v2) -> Row.COMPARATOR.compare(v1.mutation, v2.mutation)); 2120 long replaySeqId = (entry.getKey().hasOrigSequenceNumber()) 2121 ? entry.getKey().getOrigSequenceNumber() 2122 : entry.getKey().getLogSequenceNumber(); 2123 OperationStatus[] result = doReplayBatchOp(region, edits, replaySeqId); 2124 // check if it's a partial success 2125 for (int i = 0; result != null && i < result.length; i++) { 2126 if (result[i] != OperationStatus.SUCCESS) { 2127 throw new IOException(result[i].getExceptionMsg()); 2128 } 2129 } 2130 } 2131 } 2132 2133 // sync wal at the end because ASYNC_WAL is used above 2134 WAL wal = region.getWAL(); 2135 if (wal != null) { 2136 wal.sync(); 2137 } 2138 return ReplicateWALEntryResponse.newBuilder().build(); 2139 } catch (IOException ie) { 2140 throw new ServiceException(ie); 2141 } finally { 2142 final MetricsRegionServer metricsRegionServer = server.getMetrics(); 2143 if (metricsRegionServer != null) { 2144 metricsRegionServer.updateReplay(EnvironmentEdgeManager.currentTime() - before); 2145 } 2146 } 2147 } 2148 2149 /** 2150 * Replay the given changes on a secondary replica 2151 */ 2152 @Override 2153 public ReplicateWALEntryResponse replicateToReplica(RpcController controller, 2154 ReplicateWALEntryRequest request) throws ServiceException { 2155 CellScanner cells = getAndReset(controller); 2156 try { 2157 checkOpen(); 2158 List<WALEntry> entries = request.getEntryList(); 2159 if (entries == null || entries.isEmpty()) { 2160 // empty input 2161 return ReplicateWALEntryResponse.newBuilder().build(); 2162 } 2163 ByteString regionName = entries.get(0).getKey().getEncodedRegionName(); 2164 HRegion region = server.getRegionByEncodedName(regionName.toStringUtf8()); 2165 if (RegionReplicaUtil.isDefaultReplica(region.getRegionInfo())) { 2166 throw new DoNotRetryIOException( 2167 "Should not replicate to primary replica " + region.getRegionInfo() + ", CODE BUG?"); 2168 } 2169 for (WALEntry entry : entries) { 2170 if (!regionName.equals(entry.getKey().getEncodedRegionName())) { 2171 throw new NotServingRegionException( 2172 "ReplicateToReplica request contains entries from multiple " + "regions. First region:" 2173 + regionName.toStringUtf8() + " , other region:" 2174 + entry.getKey().getEncodedRegionName()); 2175 } 2176 region.replayWALEntry(entry, cells); 2177 } 2178 return ReplicateWALEntryResponse.newBuilder().build(); 2179 } catch (IOException ie) { 2180 throw new ServiceException(ie); 2181 } 2182 } 2183 2184 private void checkShouldRejectReplicationRequest(List<WALEntry> entries) throws IOException { 2185 ReplicationSourceService replicationSource = server.getReplicationSourceService(); 2186 if (replicationSource == null || entries.isEmpty()) { 2187 return; 2188 } 2189 // We can ensure that all entries are for one peer, so only need to check one entry's 2190 // table name. if the table hit sync replication at peer side and the peer cluster 2191 // is (or is transiting to) state ACTIVE or DOWNGRADE_ACTIVE, we should reject to apply 2192 // those entries according to the design doc. 2193 TableName table = TableName.valueOf(entries.get(0).getKey().getTableName().toByteArray()); 2194 if ( 2195 replicationSource.getSyncReplicationPeerInfoProvider().checkState(table, 2196 RejectReplicationRequestStateChecker.get()) 2197 ) { 2198 throw new DoNotRetryIOException( 2199 "Reject to apply to sink cluster because sync replication state of sink cluster " 2200 + "is ACTIVE or DOWNGRADE_ACTIVE, table: " + table); 2201 } 2202 } 2203 2204 /** 2205 * Replicate WAL entries on the region server. 2206 * @param controller the RPC controller 2207 * @param request the request 2208 */ 2209 @Override 2210 @QosPriority(priority = HConstants.REPLICATION_QOS) 2211 public ReplicateWALEntryResponse replicateWALEntry(final RpcController controller, 2212 final ReplicateWALEntryRequest request) throws ServiceException { 2213 try { 2214 checkOpen(); 2215 if (server.getReplicationSinkService() != null) { 2216 requestCount.increment(); 2217 List<WALEntry> entries = request.getEntryList(); 2218 checkShouldRejectReplicationRequest(entries); 2219 CellScanner cellScanner = getAndReset(controller); 2220 server.getRegionServerCoprocessorHost().preReplicateLogEntries(); 2221 server.getReplicationSinkService().replicateLogEntries(entries, cellScanner, 2222 request.getReplicationClusterId(), request.getSourceBaseNamespaceDirPath(), 2223 request.getSourceHFileArchiveDirPath()); 2224 server.getRegionServerCoprocessorHost().postReplicateLogEntries(); 2225 return ReplicateWALEntryResponse.newBuilder().build(); 2226 } else { 2227 throw new ServiceException("Replication services are not initialized yet"); 2228 } 2229 } catch (IOException ie) { 2230 throw new ServiceException(ie); 2231 } 2232 } 2233 2234 /** 2235 * Roll the WAL writer of the region server. 2236 * @param controller the RPC controller 2237 * @param request the request 2238 */ 2239 @Override 2240 @QosPriority(priority = HConstants.ADMIN_QOS) 2241 public RollWALWriterResponse rollWALWriter(final RpcController controller, 2242 final RollWALWriterRequest request) throws ServiceException { 2243 try { 2244 checkOpen(); 2245 requestCount.increment(); 2246 server.getRegionServerCoprocessorHost().preRollWALWriterRequest(); 2247 server.getWalRoller().requestRollAll(); 2248 server.getRegionServerCoprocessorHost().postRollWALWriterRequest(); 2249 RollWALWriterResponse.Builder builder = RollWALWriterResponse.newBuilder(); 2250 return builder.build(); 2251 } catch (IOException ie) { 2252 throw new ServiceException(ie); 2253 } 2254 } 2255 2256 /** 2257 * Stop the region server. 2258 * @param controller the RPC controller 2259 * @param request the request 2260 */ 2261 @Override 2262 @QosPriority(priority = HConstants.ADMIN_QOS) 2263 public StopServerResponse stopServer(final RpcController controller, 2264 final StopServerRequest request) throws ServiceException { 2265 rpcPreCheck("stopServer"); 2266 requestCount.increment(); 2267 String reason = request.getReason(); 2268 server.stop(reason); 2269 return StopServerResponse.newBuilder().build(); 2270 } 2271 2272 @Override 2273 public UpdateFavoredNodesResponse updateFavoredNodes(RpcController controller, 2274 UpdateFavoredNodesRequest request) throws ServiceException { 2275 rpcPreCheck("updateFavoredNodes"); 2276 List<UpdateFavoredNodesRequest.RegionUpdateInfo> openInfoList = request.getUpdateInfoList(); 2277 UpdateFavoredNodesResponse.Builder respBuilder = UpdateFavoredNodesResponse.newBuilder(); 2278 for (UpdateFavoredNodesRequest.RegionUpdateInfo regionUpdateInfo : openInfoList) { 2279 RegionInfo hri = ProtobufUtil.toRegionInfo(regionUpdateInfo.getRegion()); 2280 if (regionUpdateInfo.getFavoredNodesCount() > 0) { 2281 server.updateRegionFavoredNodesMapping(hri.getEncodedName(), 2282 regionUpdateInfo.getFavoredNodesList()); 2283 } 2284 } 2285 respBuilder.setResponse(openInfoList.size()); 2286 return respBuilder.build(); 2287 } 2288 2289 /** 2290 * Atomically bulk load several HFiles into an open region 2291 * @return true if successful, false is failed but recoverably (no action) 2292 * @throws ServiceException if failed unrecoverably 2293 */ 2294 @Override 2295 public BulkLoadHFileResponse bulkLoadHFile(final RpcController controller, 2296 final BulkLoadHFileRequest request) throws ServiceException { 2297 long start = EnvironmentEdgeManager.currentTime(); 2298 List<String> clusterIds = new ArrayList<>(request.getClusterIdsList()); 2299 if (clusterIds.contains(this.server.getClusterId())) { 2300 return BulkLoadHFileResponse.newBuilder().setLoaded(true).build(); 2301 } else { 2302 clusterIds.add(this.server.getClusterId()); 2303 } 2304 try { 2305 checkOpen(); 2306 requestCount.increment(); 2307 HRegion region = getRegion(request.getRegion()); 2308 final boolean spaceQuotaEnabled = QuotaUtil.isQuotaEnabled(getConfiguration()); 2309 long sizeToBeLoaded = -1; 2310 2311 // Check to see if this bulk load would exceed the space quota for this table 2312 if (spaceQuotaEnabled) { 2313 ActivePolicyEnforcement activeSpaceQuotas = getSpaceQuotaManager().getActiveEnforcements(); 2314 SpaceViolationPolicyEnforcement enforcement = 2315 activeSpaceQuotas.getPolicyEnforcement(region); 2316 if (enforcement != null) { 2317 // Bulk loads must still be atomic. We must enact all or none. 2318 List<String> filePaths = new ArrayList<>(request.getFamilyPathCount()); 2319 for (FamilyPath familyPath : request.getFamilyPathList()) { 2320 filePaths.add(familyPath.getPath()); 2321 } 2322 // Check if the batch of files exceeds the current quota 2323 sizeToBeLoaded = enforcement.computeBulkLoadSize(getFileSystem(filePaths), filePaths); 2324 } 2325 } 2326 // secure bulk load 2327 Map<byte[], List<Path>> map = 2328 server.getSecureBulkLoadManager().secureBulkLoadHFiles(region, request, clusterIds); 2329 BulkLoadHFileResponse.Builder builder = BulkLoadHFileResponse.newBuilder(); 2330 builder.setLoaded(map != null); 2331 if (map != null) { 2332 // Treat any negative size as a flag to "ignore" updating the region size as that is 2333 // not possible to occur in real life (cannot bulk load a file with negative size) 2334 if (spaceQuotaEnabled && sizeToBeLoaded > 0) { 2335 if (LOG.isTraceEnabled()) { 2336 LOG.trace("Incrementing space use of " + region.getRegionInfo() + " by " 2337 + sizeToBeLoaded + " bytes"); 2338 } 2339 // Inform space quotas of the new files for this region 2340 getSpaceQuotaManager().getRegionSizeStore().incrementRegionSize(region.getRegionInfo(), 2341 sizeToBeLoaded); 2342 } 2343 } 2344 return builder.build(); 2345 } catch (IOException ie) { 2346 throw new ServiceException(ie); 2347 } finally { 2348 final MetricsRegionServer metricsRegionServer = server.getMetrics(); 2349 if (metricsRegionServer != null) { 2350 metricsRegionServer.updateBulkLoad(EnvironmentEdgeManager.currentTime() - start); 2351 } 2352 } 2353 } 2354 2355 @Override 2356 public PrepareBulkLoadResponse prepareBulkLoad(RpcController controller, 2357 PrepareBulkLoadRequest request) throws ServiceException { 2358 try { 2359 checkOpen(); 2360 requestCount.increment(); 2361 2362 HRegion region = getRegion(request.getRegion()); 2363 2364 String bulkToken = server.getSecureBulkLoadManager().prepareBulkLoad(region, request); 2365 PrepareBulkLoadResponse.Builder builder = PrepareBulkLoadResponse.newBuilder(); 2366 builder.setBulkToken(bulkToken); 2367 return builder.build(); 2368 } catch (IOException ie) { 2369 throw new ServiceException(ie); 2370 } 2371 } 2372 2373 @Override 2374 public CleanupBulkLoadResponse cleanupBulkLoad(RpcController controller, 2375 CleanupBulkLoadRequest request) throws ServiceException { 2376 try { 2377 checkOpen(); 2378 requestCount.increment(); 2379 2380 HRegion region = getRegion(request.getRegion()); 2381 2382 server.getSecureBulkLoadManager().cleanupBulkLoad(region, request); 2383 return CleanupBulkLoadResponse.newBuilder().build(); 2384 } catch (IOException ie) { 2385 throw new ServiceException(ie); 2386 } 2387 } 2388 2389 @Override 2390 public CoprocessorServiceResponse execService(final RpcController controller, 2391 final CoprocessorServiceRequest request) throws ServiceException { 2392 try { 2393 checkOpen(); 2394 requestCount.increment(); 2395 HRegion region = getRegion(request.getRegion()); 2396 Message result = execServiceOnRegion(region, request.getCall()); 2397 CoprocessorServiceResponse.Builder builder = CoprocessorServiceResponse.newBuilder(); 2398 builder.setRegion(RequestConverter.buildRegionSpecifier(RegionSpecifierType.REGION_NAME, 2399 region.getRegionInfo().getRegionName())); 2400 // TODO: COPIES!!!!!! 2401 builder.setValue(builder.getValueBuilder().setName(result.getClass().getName()).setValue( 2402 org.apache.hbase.thirdparty.com.google.protobuf.ByteString.copyFrom(result.toByteArray()))); 2403 return builder.build(); 2404 } catch (IOException ie) { 2405 throw new ServiceException(ie); 2406 } 2407 } 2408 2409 private FileSystem getFileSystem(List<String> filePaths) throws IOException { 2410 if (filePaths.isEmpty()) { 2411 // local hdfs 2412 return server.getFileSystem(); 2413 } 2414 // source hdfs 2415 return new Path(filePaths.get(0)).getFileSystem(server.getConfiguration()); 2416 } 2417 2418 private Message execServiceOnRegion(HRegion region, 2419 final ClientProtos.CoprocessorServiceCall serviceCall) throws IOException { 2420 // ignore the passed in controller (from the serialized call) 2421 ServerRpcController execController = new ServerRpcController(); 2422 return region.execService(execController, serviceCall); 2423 } 2424 2425 private boolean shouldRejectRequestsFromClient(HRegion region) { 2426 TableName table = region.getRegionInfo().getTable(); 2427 ReplicationSourceService service = server.getReplicationSourceService(); 2428 return service != null && service.getSyncReplicationPeerInfoProvider().checkState(table, 2429 RejectRequestsFromClientStateChecker.get()); 2430 } 2431 2432 private void rejectIfInStandByState(HRegion region) throws DoNotRetryIOException { 2433 if (shouldRejectRequestsFromClient(region)) { 2434 throw new DoNotRetryIOException( 2435 region.getRegionInfo().getRegionNameAsString() + " is in STANDBY state."); 2436 } 2437 } 2438 2439 /** 2440 * Get data from a table. 2441 * @param controller the RPC controller 2442 * @param request the get request 2443 */ 2444 @Override 2445 public GetResponse get(final RpcController controller, final GetRequest request) 2446 throws ServiceException { 2447 long before = EnvironmentEdgeManager.currentTime(); 2448 OperationQuota quota = null; 2449 HRegion region = null; 2450 RpcCallContext context = RpcServer.getCurrentCall().orElse(null); 2451 try { 2452 checkOpen(); 2453 requestCount.increment(); 2454 rpcGetRequestCount.increment(); 2455 region = getRegion(request.getRegion()); 2456 rejectIfInStandByState(region); 2457 2458 GetResponse.Builder builder = GetResponse.newBuilder(); 2459 ClientProtos.Get get = request.getGet(); 2460 // An asynchbase client, https://github.com/OpenTSDB/asynchbase, starts by trying to do 2461 // a get closest before. Throwing the UnknownProtocolException signals it that it needs 2462 // to switch and do hbase2 protocol (HBase servers do not tell clients what versions 2463 // they are; its a problem for non-native clients like asynchbase. HBASE-20225. 2464 if (get.hasClosestRowBefore() && get.getClosestRowBefore()) { 2465 throw new UnknownProtocolException("Is this a pre-hbase-1.0.0 or asynchbase client? " 2466 + "Client is invoking getClosestRowBefore removed in hbase-2.0.0 replaced by " 2467 + "reverse Scan."); 2468 } 2469 Boolean existence = null; 2470 Result r = null; 2471 quota = getRpcQuotaManager().checkBatchQuota(region, OperationQuota.OperationType.GET); 2472 2473 Get clientGet = ProtobufUtil.toGet(get); 2474 if (get.getExistenceOnly() && region.getCoprocessorHost() != null) { 2475 existence = region.getCoprocessorHost().preExists(clientGet); 2476 } 2477 if (existence == null) { 2478 if (context != null) { 2479 r = get(clientGet, (region), null, context); 2480 } else { 2481 // for test purpose 2482 r = region.get(clientGet); 2483 } 2484 if (get.getExistenceOnly()) { 2485 boolean exists = r.getExists(); 2486 if (region.getCoprocessorHost() != null) { 2487 exists = region.getCoprocessorHost().postExists(clientGet, exists); 2488 } 2489 existence = exists; 2490 } 2491 } 2492 if (existence != null) { 2493 ClientProtos.Result pbr = 2494 ProtobufUtil.toResult(existence, region.getRegionInfo().getReplicaId() != 0); 2495 builder.setResult(pbr); 2496 } else if (r != null) { 2497 ClientProtos.Result pbr; 2498 if ( 2499 isClientCellBlockSupport(context) && controller instanceof HBaseRpcController 2500 && VersionInfoUtil.hasMinimumVersion(context.getClientVersionInfo(), 1, 3) 2501 ) { 2502 pbr = ProtobufUtil.toResultNoData(r); 2503 ((HBaseRpcController) controller).setCellScanner(PrivateCellUtil 2504 .createExtendedCellScanner(PackagePrivateFieldAccessor.getExtendedRawCells(r))); 2505 addSize(context, r); 2506 } else { 2507 pbr = ProtobufUtil.toResult(r); 2508 } 2509 builder.setResult(pbr); 2510 } 2511 // r.cells is null when an table.exists(get) call 2512 if (r != null && r.rawCells() != null) { 2513 quota.addGetResult(r); 2514 } 2515 return builder.build(); 2516 } catch (IOException ie) { 2517 throw new ServiceException(ie); 2518 } finally { 2519 final MetricsRegionServer metricsRegionServer = server.getMetrics(); 2520 if (metricsRegionServer != null && region != null) { 2521 long blockBytesScanned = context != null ? context.getBlockBytesScanned() : 0; 2522 metricsRegionServer.updateGet(region, EnvironmentEdgeManager.currentTime() - before, 2523 blockBytesScanned); 2524 } 2525 if (quota != null) { 2526 quota.close(); 2527 } 2528 } 2529 } 2530 2531 private Result get(Get get, HRegion region, RegionScannersCloseCallBack closeCallBack, 2532 RpcCallContext context) throws IOException { 2533 region.prepareGet(get); 2534 boolean stale = region.getRegionInfo().getReplicaId() != 0; 2535 2536 // This method is almost the same as HRegion#get. 2537 List<Cell> results = new ArrayList<>(); 2538 // pre-get CP hook 2539 if (region.getCoprocessorHost() != null) { 2540 if (region.getCoprocessorHost().preGet(get, results)) { 2541 region.metricsUpdateForGet(); 2542 return Result.create(results, get.isCheckExistenceOnly() ? !results.isEmpty() : null, 2543 stale); 2544 } 2545 } 2546 Scan scan = new Scan(get); 2547 if (scan.getLoadColumnFamiliesOnDemandValue() == null) { 2548 scan.setLoadColumnFamiliesOnDemand(region.isLoadingCfsOnDemandDefault()); 2549 } 2550 RegionScannerImpl scanner = null; 2551 try { 2552 scanner = region.getScanner(scan); 2553 scanner.next(results); 2554 } finally { 2555 if (scanner != null) { 2556 if (closeCallBack == null) { 2557 // If there is a context then the scanner can be added to the current 2558 // RpcCallContext. The rpc callback will take care of closing the 2559 // scanner, for eg in case 2560 // of get() 2561 context.setCallBack(scanner); 2562 } else { 2563 // The call is from multi() where the results from the get() are 2564 // aggregated and then send out to the 2565 // rpc. The rpccall back will close all such scanners created as part 2566 // of multi(). 2567 closeCallBack.addScanner(scanner); 2568 } 2569 } 2570 } 2571 2572 // post-get CP hook 2573 if (region.getCoprocessorHost() != null) { 2574 region.getCoprocessorHost().postGet(get, results); 2575 } 2576 region.metricsUpdateForGet(); 2577 2578 return Result.create(results, get.isCheckExistenceOnly() ? !results.isEmpty() : null, stale); 2579 } 2580 2581 private void checkBatchSizeAndLogLargeSize(MultiRequest request) throws ServiceException { 2582 int sum = 0; 2583 String firstRegionName = null; 2584 for (RegionAction regionAction : request.getRegionActionList()) { 2585 if (sum == 0) { 2586 firstRegionName = Bytes.toStringBinary(regionAction.getRegion().getValue().toByteArray()); 2587 } 2588 sum += regionAction.getActionCount(); 2589 } 2590 if (sum > rowSizeWarnThreshold) { 2591 LOG.warn("Large batch operation detected (greater than " + rowSizeWarnThreshold 2592 + ") (HBASE-18023)." + " Requested Number of Rows: " + sum + " Client: " 2593 + RpcServer.getRequestUserName().orElse(null) + "/" 2594 + RpcServer.getRemoteAddress().orElse(null) + " first region in multi=" + firstRegionName); 2595 if (rejectRowsWithSizeOverThreshold) { 2596 throw new ServiceException( 2597 "Rejecting large batch operation for current batch with firstRegionName: " 2598 + firstRegionName + " , Requested Number of Rows: " + sum + " , Size Threshold: " 2599 + rowSizeWarnThreshold); 2600 } 2601 } 2602 } 2603 2604 private void failRegionAction(MultiResponse.Builder responseBuilder, 2605 RegionActionResult.Builder regionActionResultBuilder, RegionAction regionAction, 2606 CellScanner cellScanner, Throwable error) { 2607 rpcServer.getMetrics().exception(error); 2608 regionActionResultBuilder.setException(ResponseConverter.buildException(error)); 2609 responseBuilder.addRegionActionResult(regionActionResultBuilder.build()); 2610 // All Mutations in this RegionAction not executed as we can not see the Region online here 2611 // in this RS. Will be retried from Client. Skipping all the Cells in CellScanner 2612 // corresponding to these Mutations. 2613 if (cellScanner != null) { 2614 skipCellsForMutations(regionAction.getActionList(), cellScanner); 2615 } 2616 } 2617 2618 private boolean isReplicationRequest(Action action) { 2619 // replication request can only be put or delete. 2620 if (!action.hasMutation()) { 2621 return false; 2622 } 2623 MutationProto mutation = action.getMutation(); 2624 MutationType type = mutation.getMutateType(); 2625 if (type != MutationType.PUT && type != MutationType.DELETE) { 2626 return false; 2627 } 2628 // replication will set a special attribute so we can make use of it to decide whether a request 2629 // is for replication. 2630 return mutation.getAttributeList().stream().map(p -> p.getName()) 2631 .filter(n -> n.equals(ReplicationUtils.REPLICATION_ATTR_NAME)).findAny().isPresent(); 2632 } 2633 2634 /** 2635 * Execute multiple actions on a table: get, mutate, and/or execCoprocessor 2636 * @param rpcc the RPC controller 2637 * @param request the multi request 2638 */ 2639 @Override 2640 public MultiResponse multi(final RpcController rpcc, final MultiRequest request) 2641 throws ServiceException { 2642 try { 2643 checkOpen(); 2644 } catch (IOException ie) { 2645 throw new ServiceException(ie); 2646 } 2647 2648 checkBatchSizeAndLogLargeSize(request); 2649 2650 // rpc controller is how we bring in data via the back door; it is unprotobuf'ed data. 2651 // It is also the conduit via which we pass back data. 2652 HBaseRpcController controller = (HBaseRpcController) rpcc; 2653 CellScanner cellScanner = controller != null ? getAndReset(controller) : null; 2654 2655 long nonceGroup = request.hasNonceGroup() ? request.getNonceGroup() : HConstants.NO_NONCE; 2656 2657 MultiResponse.Builder responseBuilder = MultiResponse.newBuilder(); 2658 RegionActionResult.Builder regionActionResultBuilder = RegionActionResult.newBuilder(); 2659 this.rpcMultiRequestCount.increment(); 2660 this.requestCount.increment(); 2661 ActivePolicyEnforcement spaceQuotaEnforcement = getSpaceQuotaManager().getActiveEnforcements(); 2662 2663 // We no longer use MultiRequest#condition. Instead, we use RegionAction#condition. The 2664 // following logic is for backward compatibility as old clients still use 2665 // MultiRequest#condition in case of checkAndMutate with RowMutations. 2666 if (request.hasCondition()) { 2667 if (request.getRegionActionList().isEmpty()) { 2668 // If the region action list is empty, do nothing. 2669 responseBuilder.setProcessed(true); 2670 return responseBuilder.build(); 2671 } 2672 2673 RegionAction regionAction = request.getRegionAction(0); 2674 2675 // When request.hasCondition() is true, regionAction.getAtomic() should be always true. So 2676 // we can assume regionAction.getAtomic() is true here. 2677 assert regionAction.getAtomic(); 2678 2679 OperationQuota quota; 2680 HRegion region; 2681 RegionSpecifier regionSpecifier = regionAction.getRegion(); 2682 2683 try { 2684 region = getRegion(regionSpecifier); 2685 quota = getRpcQuotaManager().checkBatchQuota(region, regionAction.getActionList(), 2686 regionAction.hasCondition()); 2687 } catch (IOException e) { 2688 failRegionAction(responseBuilder, regionActionResultBuilder, regionAction, cellScanner, e); 2689 return responseBuilder.build(); 2690 } 2691 2692 try { 2693 boolean rejectIfFromClient = shouldRejectRequestsFromClient(region); 2694 // We only allow replication in standby state and it will not set the atomic flag. 2695 if (rejectIfFromClient) { 2696 failRegionAction(responseBuilder, regionActionResultBuilder, regionAction, cellScanner, 2697 new DoNotRetryIOException( 2698 region.getRegionInfo().getRegionNameAsString() + " is in STANDBY state")); 2699 return responseBuilder.build(); 2700 } 2701 2702 try { 2703 CheckAndMutateResult result = checkAndMutate(region, regionAction.getActionList(), 2704 cellScanner, request.getCondition(), nonceGroup, spaceQuotaEnforcement); 2705 responseBuilder.setProcessed(result.isSuccess()); 2706 ClientProtos.ResultOrException.Builder resultOrExceptionOrBuilder = 2707 ClientProtos.ResultOrException.newBuilder(); 2708 for (int i = 0; i < regionAction.getActionCount(); i++) { 2709 // To unify the response format with doNonAtomicRegionMutation and read through 2710 // client's AsyncProcess we have to add an empty result instance per operation 2711 resultOrExceptionOrBuilder.clear(); 2712 resultOrExceptionOrBuilder.setIndex(i); 2713 regionActionResultBuilder.addResultOrException(resultOrExceptionOrBuilder.build()); 2714 } 2715 } catch (IOException e) { 2716 rpcServer.getMetrics().exception(e); 2717 // As it's an atomic operation with a condition, we may expect it's a global failure. 2718 regionActionResultBuilder.setException(ResponseConverter.buildException(e)); 2719 } 2720 } finally { 2721 quota.close(); 2722 } 2723 2724 responseBuilder.addRegionActionResult(regionActionResultBuilder.build()); 2725 ClientProtos.RegionLoadStats regionLoadStats = region.getLoadStatistics(); 2726 if (regionLoadStats != null) { 2727 responseBuilder.setRegionStatistics(MultiRegionLoadStats.newBuilder() 2728 .addRegion(regionSpecifier).addStat(regionLoadStats).build()); 2729 } 2730 return responseBuilder.build(); 2731 } 2732 2733 // this will contain all the cells that we need to return. It's created later, if needed. 2734 List<ExtendedCellScannable> cellsToReturn = null; 2735 RegionScannersCloseCallBack closeCallBack = null; 2736 RpcCallContext context = RpcServer.getCurrentCall().orElse(null); 2737 Map<RegionSpecifier, ClientProtos.RegionLoadStats> regionStats = 2738 new HashMap<>(request.getRegionActionCount()); 2739 2740 for (RegionAction regionAction : request.getRegionActionList()) { 2741 OperationQuota quota; 2742 HRegion region; 2743 RegionSpecifier regionSpecifier = regionAction.getRegion(); 2744 regionActionResultBuilder.clear(); 2745 2746 try { 2747 region = getRegion(regionSpecifier); 2748 quota = getRpcQuotaManager().checkBatchQuota(region, regionAction.getActionList(), 2749 regionAction.hasCondition()); 2750 } catch (IOException e) { 2751 failRegionAction(responseBuilder, regionActionResultBuilder, regionAction, cellScanner, e); 2752 continue; // For this region it's a failure. 2753 } 2754 2755 try { 2756 boolean rejectIfFromClient = shouldRejectRequestsFromClient(region); 2757 2758 if (regionAction.hasCondition()) { 2759 // We only allow replication in standby state and it will not set the atomic flag. 2760 if (rejectIfFromClient) { 2761 failRegionAction(responseBuilder, regionActionResultBuilder, regionAction, cellScanner, 2762 new DoNotRetryIOException( 2763 region.getRegionInfo().getRegionNameAsString() + " is in STANDBY state")); 2764 continue; 2765 } 2766 2767 try { 2768 ClientProtos.ResultOrException.Builder resultOrExceptionOrBuilder = 2769 ClientProtos.ResultOrException.newBuilder(); 2770 if (regionAction.getActionCount() == 1) { 2771 CheckAndMutateResult result = 2772 checkAndMutate(region, quota, regionAction.getAction(0).getMutation(), cellScanner, 2773 regionAction.getCondition(), nonceGroup, spaceQuotaEnforcement, context); 2774 regionActionResultBuilder.setProcessed(result.isSuccess()); 2775 resultOrExceptionOrBuilder.setIndex(0); 2776 if (result.getResult() != null) { 2777 resultOrExceptionOrBuilder.setResult(ProtobufUtil.toResult(result.getResult())); 2778 } 2779 regionActionResultBuilder.addResultOrException(resultOrExceptionOrBuilder.build()); 2780 } else { 2781 CheckAndMutateResult result = checkAndMutate(region, regionAction.getActionList(), 2782 cellScanner, regionAction.getCondition(), nonceGroup, spaceQuotaEnforcement); 2783 regionActionResultBuilder.setProcessed(result.isSuccess()); 2784 for (int i = 0; i < regionAction.getActionCount(); i++) { 2785 if (i == 0 && result.getResult() != null) { 2786 // Set the result of the Increment/Append operations to the first element of the 2787 // ResultOrException list 2788 resultOrExceptionOrBuilder.setIndex(i); 2789 regionActionResultBuilder.addResultOrException(resultOrExceptionOrBuilder 2790 .setResult(ProtobufUtil.toResult(result.getResult())).build()); 2791 continue; 2792 } 2793 // To unify the response format with doNonAtomicRegionMutation and read through 2794 // client's AsyncProcess we have to add an empty result instance per operation 2795 resultOrExceptionOrBuilder.clear(); 2796 resultOrExceptionOrBuilder.setIndex(i); 2797 regionActionResultBuilder.addResultOrException(resultOrExceptionOrBuilder.build()); 2798 } 2799 } 2800 } catch (IOException e) { 2801 rpcServer.getMetrics().exception(e); 2802 // As it's an atomic operation with a condition, we may expect it's a global failure. 2803 regionActionResultBuilder.setException(ResponseConverter.buildException(e)); 2804 } 2805 } else if (regionAction.hasAtomic() && regionAction.getAtomic()) { 2806 // We only allow replication in standby state and it will not set the atomic flag. 2807 if (rejectIfFromClient) { 2808 failRegionAction(responseBuilder, regionActionResultBuilder, regionAction, cellScanner, 2809 new DoNotRetryIOException( 2810 region.getRegionInfo().getRegionNameAsString() + " is in STANDBY state")); 2811 continue; 2812 } 2813 try { 2814 doAtomicBatchOp(regionActionResultBuilder, region, quota, regionAction.getActionList(), 2815 cellScanner, nonceGroup, spaceQuotaEnforcement); 2816 regionActionResultBuilder.setProcessed(true); 2817 // We no longer use MultiResponse#processed. Instead, we use 2818 // RegionActionResult#processed. This is for backward compatibility for old clients. 2819 responseBuilder.setProcessed(true); 2820 } catch (IOException e) { 2821 rpcServer.getMetrics().exception(e); 2822 // As it's atomic, we may expect it's a global failure. 2823 regionActionResultBuilder.setException(ResponseConverter.buildException(e)); 2824 } 2825 } else { 2826 if ( 2827 rejectIfFromClient && regionAction.getActionCount() > 0 2828 && !isReplicationRequest(regionAction.getAction(0)) 2829 ) { 2830 // fail if it is not a replication request 2831 failRegionAction(responseBuilder, regionActionResultBuilder, regionAction, cellScanner, 2832 new DoNotRetryIOException( 2833 region.getRegionInfo().getRegionNameAsString() + " is in STANDBY state")); 2834 continue; 2835 } 2836 // doNonAtomicRegionMutation manages the exception internally 2837 if (context != null && closeCallBack == null) { 2838 // An RpcCallBack that creates a list of scanners that needs to perform callBack 2839 // operation on completion of multiGets. 2840 // Set this only once 2841 closeCallBack = new RegionScannersCloseCallBack(); 2842 context.setCallBack(closeCallBack); 2843 } 2844 cellsToReturn = doNonAtomicRegionMutation(region, quota, regionAction, cellScanner, 2845 regionActionResultBuilder, cellsToReturn, nonceGroup, closeCallBack, context, 2846 spaceQuotaEnforcement); 2847 } 2848 } finally { 2849 quota.close(); 2850 } 2851 2852 responseBuilder.addRegionActionResult(regionActionResultBuilder.build()); 2853 ClientProtos.RegionLoadStats regionLoadStats = region.getLoadStatistics(); 2854 if (regionLoadStats != null) { 2855 regionStats.put(regionSpecifier, regionLoadStats); 2856 } 2857 } 2858 // Load the controller with the Cells to return. 2859 if (cellsToReturn != null && !cellsToReturn.isEmpty() && controller != null) { 2860 controller.setCellScanner(PrivateCellUtil.createExtendedCellScanner(cellsToReturn)); 2861 } 2862 2863 MultiRegionLoadStats.Builder builder = MultiRegionLoadStats.newBuilder(); 2864 for (Entry<RegionSpecifier, ClientProtos.RegionLoadStats> stat : regionStats.entrySet()) { 2865 builder.addRegion(stat.getKey()); 2866 builder.addStat(stat.getValue()); 2867 } 2868 responseBuilder.setRegionStatistics(builder); 2869 return responseBuilder.build(); 2870 } 2871 2872 private void skipCellsForMutations(List<Action> actions, CellScanner cellScanner) { 2873 if (cellScanner == null) { 2874 return; 2875 } 2876 for (Action action : actions) { 2877 skipCellsForMutation(action, cellScanner); 2878 } 2879 } 2880 2881 private void skipCellsForMutation(Action action, CellScanner cellScanner) { 2882 if (cellScanner == null) { 2883 return; 2884 } 2885 try { 2886 if (action.hasMutation()) { 2887 MutationProto m = action.getMutation(); 2888 if (m.hasAssociatedCellCount()) { 2889 for (int i = 0; i < m.getAssociatedCellCount(); i++) { 2890 cellScanner.advance(); 2891 } 2892 } 2893 } 2894 } catch (IOException e) { 2895 // No need to handle these Individual Muatation level issue. Any way this entire RegionAction 2896 // marked as failed as we could not see the Region here. At client side the top level 2897 // RegionAction exception will be considered first. 2898 LOG.error("Error while skipping Cells in CellScanner for invalid Region Mutations", e); 2899 } 2900 } 2901 2902 /** 2903 * Mutate data in a table. 2904 * @param rpcc the RPC controller 2905 * @param request the mutate request 2906 */ 2907 @Override 2908 public MutateResponse mutate(final RpcController rpcc, final MutateRequest request) 2909 throws ServiceException { 2910 // rpc controller is how we bring in data via the back door; it is unprotobuf'ed data. 2911 // It is also the conduit via which we pass back data. 2912 HBaseRpcController controller = (HBaseRpcController) rpcc; 2913 CellScanner cellScanner = controller != null ? controller.cellScanner() : null; 2914 OperationQuota quota = null; 2915 RpcCallContext context = RpcServer.getCurrentCall().orElse(null); 2916 // Clear scanner so we are not holding on to reference across call. 2917 if (controller != null) { 2918 controller.setCellScanner(null); 2919 } 2920 try { 2921 checkOpen(); 2922 requestCount.increment(); 2923 rpcMutateRequestCount.increment(); 2924 HRegion region = getRegion(request.getRegion()); 2925 rejectIfInStandByState(region); 2926 MutateResponse.Builder builder = MutateResponse.newBuilder(); 2927 MutationProto mutation = request.getMutation(); 2928 if (!region.getRegionInfo().isMetaRegion()) { 2929 server.getMemStoreFlusher().reclaimMemStoreMemory(); 2930 } 2931 long nonceGroup = request.hasNonceGroup() ? request.getNonceGroup() : HConstants.NO_NONCE; 2932 OperationQuota.OperationType operationType = QuotaUtil.getQuotaOperationType(request); 2933 quota = getRpcQuotaManager().checkBatchQuota(region, operationType); 2934 ActivePolicyEnforcement spaceQuotaEnforcement = 2935 getSpaceQuotaManager().getActiveEnforcements(); 2936 2937 if (request.hasCondition()) { 2938 CheckAndMutateResult result = checkAndMutate(region, quota, mutation, cellScanner, 2939 request.getCondition(), nonceGroup, spaceQuotaEnforcement, context); 2940 builder.setProcessed(result.isSuccess()); 2941 boolean clientCellBlockSupported = isClientCellBlockSupport(context); 2942 addResult(builder, result.getResult(), controller, clientCellBlockSupported); 2943 if (clientCellBlockSupported) { 2944 addSize(context, result.getResult()); 2945 } 2946 } else { 2947 Result r = null; 2948 Boolean processed = null; 2949 MutationType type = mutation.getMutateType(); 2950 switch (type) { 2951 case APPEND: 2952 // TODO: this doesn't actually check anything. 2953 r = append(region, quota, mutation, cellScanner, nonceGroup, spaceQuotaEnforcement, 2954 context); 2955 break; 2956 case INCREMENT: 2957 // TODO: this doesn't actually check anything. 2958 r = increment(region, quota, mutation, cellScanner, nonceGroup, spaceQuotaEnforcement, 2959 context); 2960 break; 2961 case PUT: 2962 put(region, quota, mutation, cellScanner, spaceQuotaEnforcement); 2963 processed = Boolean.TRUE; 2964 break; 2965 case DELETE: 2966 delete(region, quota, mutation, cellScanner, spaceQuotaEnforcement); 2967 processed = Boolean.TRUE; 2968 break; 2969 default: 2970 throw new DoNotRetryIOException("Unsupported mutate type: " + type.name()); 2971 } 2972 if (processed != null) { 2973 builder.setProcessed(processed); 2974 } 2975 boolean clientCellBlockSupported = isClientCellBlockSupport(context); 2976 addResult(builder, r, controller, clientCellBlockSupported); 2977 if (clientCellBlockSupported) { 2978 addSize(context, r); 2979 } 2980 } 2981 return builder.build(); 2982 } catch (IOException ie) { 2983 server.checkFileSystem(); 2984 throw new ServiceException(ie); 2985 } finally { 2986 if (quota != null) { 2987 quota.close(); 2988 } 2989 } 2990 } 2991 2992 private void put(HRegion region, OperationQuota quota, MutationProto mutation, 2993 CellScanner cellScanner, ActivePolicyEnforcement spaceQuota) throws IOException { 2994 long before = EnvironmentEdgeManager.currentTime(); 2995 Put put = ProtobufUtil.toPut(mutation, cellScanner); 2996 checkCellSizeLimit(region, put); 2997 spaceQuota.getPolicyEnforcement(region).check(put); 2998 quota.addMutation(put); 2999 region.put(put); 3000 3001 MetricsRegionServer metricsRegionServer = server.getMetrics(); 3002 if (metricsRegionServer != null) { 3003 long after = EnvironmentEdgeManager.currentTime(); 3004 metricsRegionServer.updatePut(region, after - before); 3005 } 3006 } 3007 3008 private void delete(HRegion region, OperationQuota quota, MutationProto mutation, 3009 CellScanner cellScanner, ActivePolicyEnforcement spaceQuota) throws IOException { 3010 long before = EnvironmentEdgeManager.currentTime(); 3011 Delete delete = ProtobufUtil.toDelete(mutation, cellScanner); 3012 checkCellSizeLimit(region, delete); 3013 spaceQuota.getPolicyEnforcement(region).check(delete); 3014 quota.addMutation(delete); 3015 region.delete(delete); 3016 3017 MetricsRegionServer metricsRegionServer = server.getMetrics(); 3018 if (metricsRegionServer != null) { 3019 long after = EnvironmentEdgeManager.currentTime(); 3020 metricsRegionServer.updateDelete(region, after - before); 3021 } 3022 } 3023 3024 private CheckAndMutateResult checkAndMutate(HRegion region, OperationQuota quota, 3025 MutationProto mutation, CellScanner cellScanner, Condition condition, long nonceGroup, 3026 ActivePolicyEnforcement spaceQuota, RpcCallContext context) throws IOException { 3027 long before = EnvironmentEdgeManager.currentTime(); 3028 long blockBytesScannedBefore = context != null ? context.getBlockBytesScanned() : 0; 3029 CheckAndMutate checkAndMutate = ProtobufUtil.toCheckAndMutate(condition, mutation, cellScanner); 3030 long nonce = mutation.hasNonce() ? mutation.getNonce() : HConstants.NO_NONCE; 3031 checkCellSizeLimit(region, (Mutation) checkAndMutate.getAction()); 3032 spaceQuota.getPolicyEnforcement(region).check((Mutation) checkAndMutate.getAction()); 3033 quota.addMutation((Mutation) checkAndMutate.getAction()); 3034 3035 CheckAndMutateResult result = null; 3036 if (region.getCoprocessorHost() != null) { 3037 result = region.getCoprocessorHost().preCheckAndMutate(checkAndMutate); 3038 } 3039 if (result == null) { 3040 result = region.checkAndMutate(checkAndMutate, nonceGroup, nonce); 3041 if (region.getCoprocessorHost() != null) { 3042 result = region.getCoprocessorHost().postCheckAndMutate(checkAndMutate, result); 3043 } 3044 } 3045 MetricsRegionServer metricsRegionServer = server.getMetrics(); 3046 if (metricsRegionServer != null) { 3047 long after = EnvironmentEdgeManager.currentTime(); 3048 long blockBytesScanned = 3049 context != null ? context.getBlockBytesScanned() - blockBytesScannedBefore : 0; 3050 metricsRegionServer.updateCheckAndMutate(region, after - before, blockBytesScanned); 3051 3052 MutationType type = mutation.getMutateType(); 3053 switch (type) { 3054 case PUT: 3055 metricsRegionServer.updateCheckAndPut(region, after - before); 3056 break; 3057 case DELETE: 3058 metricsRegionServer.updateCheckAndDelete(region, after - before); 3059 break; 3060 default: 3061 break; 3062 } 3063 } 3064 return result; 3065 } 3066 3067 // This is used to keep compatible with the old client implementation. Consider remove it if we 3068 // decide to drop the support of the client that still sends close request to a region scanner 3069 // which has already been exhausted. 3070 @Deprecated 3071 private static final IOException SCANNER_ALREADY_CLOSED = new IOException() { 3072 3073 private static final long serialVersionUID = -4305297078988180130L; 3074 3075 @Override 3076 public synchronized Throwable fillInStackTrace() { 3077 return this; 3078 } 3079 }; 3080 3081 private RegionScannerHolder getRegionScanner(ScanRequest request) throws IOException { 3082 String scannerName = toScannerName(request.getScannerId()); 3083 RegionScannerHolder rsh = this.scanners.get(scannerName); 3084 if (rsh == null) { 3085 // just ignore the next or close request if scanner does not exists. 3086 Long lastCallSeq = closedScanners.getIfPresent(scannerName); 3087 if (lastCallSeq != null) { 3088 // Check the sequence number to catch if the last call was incorrectly retried. 3089 // The only allowed scenario is when the scanner is exhausted and one more scan 3090 // request arrives - in this case returning 0 rows is correct. 3091 if (request.hasNextCallSeq() && request.getNextCallSeq() != lastCallSeq + 1) { 3092 throw new OutOfOrderScannerNextException("Expected nextCallSeq for closed request: " 3093 + (lastCallSeq + 1) + " But the nextCallSeq got from client: " 3094 + request.getNextCallSeq() + "; request=" + TextFormat.shortDebugString(request)); 3095 } else { 3096 throw SCANNER_ALREADY_CLOSED; 3097 } 3098 } else { 3099 LOG.warn("Client tried to access missing scanner " + scannerName); 3100 throw new UnknownScannerException( 3101 "Unknown scanner '" + scannerName + "'. This can happen due to any of the following " 3102 + "reasons: a) Scanner id given is wrong, b) Scanner lease expired because of " 3103 + "long wait between consecutive client checkins, c) Server may be closing down, " 3104 + "d) RegionServer restart during upgrade.\nIf the issue is due to reason (b), a " 3105 + "possible fix would be increasing the value of" 3106 + "'hbase.client.scanner.timeout.period' configuration."); 3107 } 3108 } 3109 rejectIfInStandByState(rsh.r); 3110 RegionInfo hri = rsh.s.getRegionInfo(); 3111 // Yes, should be the same instance 3112 if (server.getOnlineRegion(hri.getRegionName()) != rsh.r) { 3113 String msg = "Region has changed on the scanner " + scannerName + ": regionName=" 3114 + hri.getRegionNameAsString() + ", scannerRegionName=" + rsh.r; 3115 LOG.warn(msg + ", closing..."); 3116 scanners.remove(scannerName); 3117 try { 3118 rsh.s.close(); 3119 } catch (IOException e) { 3120 LOG.warn("Getting exception closing " + scannerName, e); 3121 } finally { 3122 try { 3123 server.getLeaseManager().cancelLease(scannerName); 3124 } catch (LeaseException e) { 3125 LOG.warn("Getting exception closing " + scannerName, e); 3126 } 3127 } 3128 throw new NotServingRegionException(msg); 3129 } 3130 return rsh; 3131 } 3132 3133 /** 3134 * @return Pair with scannerName key to use with this new Scanner and its RegionScannerHolder 3135 * value. 3136 */ 3137 private Pair<String, RegionScannerHolder> newRegionScanner(ScanRequest request, 3138 ScanResponse.Builder builder) throws IOException { 3139 HRegion region = getRegion(request.getRegion()); 3140 rejectIfInStandByState(region); 3141 ClientProtos.Scan protoScan = request.getScan(); 3142 boolean isLoadingCfsOnDemandSet = protoScan.hasLoadColumnFamiliesOnDemand(); 3143 Scan scan = ProtobufUtil.toScan(protoScan); 3144 // if the request doesn't set this, get the default region setting. 3145 if (!isLoadingCfsOnDemandSet) { 3146 scan.setLoadColumnFamiliesOnDemand(region.isLoadingCfsOnDemandDefault()); 3147 } 3148 3149 if (!scan.hasFamilies()) { 3150 // Adding all families to scanner 3151 for (byte[] family : region.getTableDescriptor().getColumnFamilyNames()) { 3152 scan.addFamily(family); 3153 } 3154 } 3155 if (region.getCoprocessorHost() != null) { 3156 // preScannerOpen is not allowed to return a RegionScanner. Only post hook can create a 3157 // wrapper for the core created RegionScanner 3158 region.getCoprocessorHost().preScannerOpen(scan); 3159 } 3160 RegionScannerImpl coreScanner = region.getScanner(scan); 3161 Shipper shipper = coreScanner; 3162 RegionScanner scanner = coreScanner; 3163 try { 3164 if (region.getCoprocessorHost() != null) { 3165 scanner = region.getCoprocessorHost().postScannerOpen(scan, scanner); 3166 } 3167 } catch (Exception e) { 3168 // Although region coprocessor is for advanced users and they should take care of the 3169 // implementation to not damage the HBase system, closing the scanner on exception here does 3170 // not have any bad side effect, so let's do it 3171 scanner.close(); 3172 throw e; 3173 } 3174 long scannerId = scannerIdGenerator.generateNewScannerId(); 3175 builder.setScannerId(scannerId); 3176 builder.setMvccReadPoint(scanner.getMvccReadPoint()); 3177 builder.setTtl(scannerLeaseTimeoutPeriod); 3178 String scannerName = toScannerName(scannerId); 3179 3180 boolean fullRegionScan = 3181 !region.getRegionInfo().getTable().isSystemTable() && isFullRegionScan(scan, region); 3182 3183 return new Pair<String, RegionScannerHolder>(scannerName, 3184 addScanner(scannerName, scanner, shipper, region, scan.isNeedCursorResult(), fullRegionScan)); 3185 } 3186 3187 /** 3188 * The returned String is used as key doing look up of outstanding Scanners in this Servers' 3189 * this.scanners, the Map of outstanding scanners and their current state. 3190 * @param scannerId A scanner long id. 3191 * @return The long id as a String. 3192 */ 3193 private static String toScannerName(long scannerId) { 3194 return Long.toString(scannerId); 3195 } 3196 3197 private void checkScanNextCallSeq(ScanRequest request, RegionScannerHolder rsh) 3198 throws OutOfOrderScannerNextException { 3199 // if nextCallSeq does not match throw Exception straight away. This needs to be 3200 // performed even before checking of Lease. 3201 // See HBASE-5974 3202 if (request.hasNextCallSeq()) { 3203 long callSeq = request.getNextCallSeq(); 3204 if (!rsh.incNextCallSeq(callSeq)) { 3205 throw new OutOfOrderScannerNextException( 3206 "Expected nextCallSeq: " + rsh.getNextCallSeq() + " But the nextCallSeq got from client: " 3207 + request.getNextCallSeq() + "; request=" + TextFormat.shortDebugString(request)); 3208 } 3209 } 3210 } 3211 3212 private void addScannerLeaseBack(LeaseManager.Lease lease) { 3213 try { 3214 server.getLeaseManager().addLease(lease); 3215 } catch (LeaseStillHeldException e) { 3216 // should not happen as the scanner id is unique. 3217 throw new AssertionError(e); 3218 } 3219 } 3220 3221 // visible for testing only 3222 long getTimeLimit(RpcCall rpcCall, HBaseRpcController controller, 3223 boolean allowHeartbeatMessages) { 3224 // Set the time limit to be half of the more restrictive timeout value (one of the 3225 // timeout values must be positive). In the event that both values are positive, the 3226 // more restrictive of the two is used to calculate the limit. 3227 if (allowHeartbeatMessages) { 3228 long now = EnvironmentEdgeManager.currentTime(); 3229 long remainingTimeout = getRemainingRpcTimeout(rpcCall, controller, now); 3230 if (scannerLeaseTimeoutPeriod > 0 || remainingTimeout > 0) { 3231 long timeLimitDelta; 3232 if (scannerLeaseTimeoutPeriod > 0 && remainingTimeout > 0) { 3233 timeLimitDelta = Math.min(scannerLeaseTimeoutPeriod, remainingTimeout); 3234 } else { 3235 timeLimitDelta = 3236 scannerLeaseTimeoutPeriod > 0 ? scannerLeaseTimeoutPeriod : remainingTimeout; 3237 } 3238 3239 // Use half of whichever timeout value was more restrictive... But don't allow 3240 // the time limit to be less than the allowable minimum (could cause an 3241 // immediate timeout before scanning any data). 3242 timeLimitDelta = Math.max(timeLimitDelta / 2, minimumScanTimeLimitDelta); 3243 return now + timeLimitDelta; 3244 } 3245 } 3246 // Default value of timeLimit is negative to indicate no timeLimit should be 3247 // enforced. 3248 return -1L; 3249 } 3250 3251 private long getRemainingRpcTimeout(RpcCall call, HBaseRpcController controller, long now) { 3252 long timeout; 3253 if (controller != null && controller.getCallTimeout() > 0) { 3254 timeout = controller.getCallTimeout(); 3255 } else if (rpcTimeout > 0) { 3256 timeout = rpcTimeout; 3257 } else { 3258 return -1; 3259 } 3260 if (call != null) { 3261 timeout -= (now - call.getReceiveTime()); 3262 } 3263 // getTimeLimit ignores values <= 0, but timeout may now be negative if queue time was high. 3264 // return minimum value here in that case so we count this in calculating the final delta. 3265 return Math.max(minimumScanTimeLimitDelta, timeout); 3266 } 3267 3268 private void checkLimitOfRows(int numOfCompleteRows, int limitOfRows, boolean moreRows, 3269 ScannerContext scannerContext, ScanResponse.Builder builder) { 3270 if (numOfCompleteRows >= limitOfRows) { 3271 if (LOG.isTraceEnabled()) { 3272 LOG.trace("Done scanning, limit of rows reached, moreRows: " + moreRows 3273 + " scannerContext: " + scannerContext); 3274 } 3275 builder.setMoreResults(false); 3276 } 3277 } 3278 3279 // return whether we have more results in region. 3280 private void scan(HBaseRpcController controller, ScanRequest request, RegionScannerHolder rsh, 3281 long maxQuotaResultSize, int maxResults, int limitOfRows, List<Result> results, 3282 ScanResponse.Builder builder, RpcCall rpcCall) throws IOException { 3283 HRegion region = rsh.r; 3284 RegionScanner scanner = rsh.s; 3285 long maxResultSize; 3286 if (scanner.getMaxResultSize() > 0) { 3287 maxResultSize = Math.min(scanner.getMaxResultSize(), maxQuotaResultSize); 3288 } else { 3289 maxResultSize = maxQuotaResultSize; 3290 } 3291 // This is cells inside a row. Default size is 10 so if many versions or many cfs, 3292 // then we'll resize. Resizings show in profiler. Set it higher than 10. For now 3293 // arbitrary 32. TODO: keep record of general size of results being returned. 3294 ArrayList<Cell> values = new ArrayList<>(32); 3295 region.startRegionOperation(Operation.SCAN); 3296 long before = EnvironmentEdgeManager.currentTime(); 3297 // Used to check if we've matched the row limit set on the Scan 3298 int numOfCompleteRows = 0; 3299 // Count of times we call nextRaw; can be > numOfCompleteRows. 3300 int numOfNextRawCalls = 0; 3301 try { 3302 int numOfResults = 0; 3303 synchronized (scanner) { 3304 boolean stale = (region.getRegionInfo().getReplicaId() != 0); 3305 boolean clientHandlesPartials = 3306 request.hasClientHandlesPartials() && request.getClientHandlesPartials(); 3307 boolean clientHandlesHeartbeats = 3308 request.hasClientHandlesHeartbeats() && request.getClientHandlesHeartbeats(); 3309 3310 // On the server side we must ensure that the correct ordering of partial results is 3311 // returned to the client to allow them to properly reconstruct the partial results. 3312 // If the coprocessor host is adding to the result list, we cannot guarantee the 3313 // correct ordering of partial results and so we prevent partial results from being 3314 // formed. 3315 boolean serverGuaranteesOrderOfPartials = results.isEmpty(); 3316 boolean allowPartialResults = clientHandlesPartials && serverGuaranteesOrderOfPartials; 3317 boolean moreRows = false; 3318 3319 // Heartbeat messages occur when the processing of the ScanRequest is exceeds a 3320 // certain time threshold on the server. When the time threshold is exceeded, the 3321 // server stops the scan and sends back whatever Results it has accumulated within 3322 // that time period (may be empty). Since heartbeat messages have the potential to 3323 // create partial Results (in the event that the timeout occurs in the middle of a 3324 // row), we must only generate heartbeat messages when the client can handle both 3325 // heartbeats AND partials 3326 boolean allowHeartbeatMessages = clientHandlesHeartbeats && allowPartialResults; 3327 3328 long timeLimit = getTimeLimit(rpcCall, controller, allowHeartbeatMessages); 3329 3330 final LimitScope sizeScope = 3331 allowPartialResults ? LimitScope.BETWEEN_CELLS : LimitScope.BETWEEN_ROWS; 3332 final LimitScope timeScope = 3333 allowHeartbeatMessages ? LimitScope.BETWEEN_CELLS : LimitScope.BETWEEN_ROWS; 3334 3335 boolean trackMetrics = request.hasTrackScanMetrics() && request.getTrackScanMetrics(); 3336 3337 // Configure with limits for this RPC. Set keep progress true since size progress 3338 // towards size limit should be kept between calls to nextRaw 3339 ScannerContext.Builder contextBuilder = ScannerContext.newBuilder(true); 3340 // maxResultSize - either we can reach this much size for all cells(being read) data or sum 3341 // of heap size occupied by cells(being read). Cell data means its key and value parts. 3342 // maxQuotaResultSize - max results just from server side configuration and quotas, without 3343 // user's specified max. We use this for evaluating limits based on blocks (not cells). 3344 // We may have accumulated some results in coprocessor preScannerNext call. Subtract any 3345 // cell or block size from maximum here so we adhere to total limits of request. 3346 // Note: we track block size in StoreScanner. If the CP hook got cells from hbase, it will 3347 // have accumulated block bytes. If not, this will be 0 for block size. 3348 long maxCellSize = maxResultSize; 3349 long maxBlockSize = maxQuotaResultSize; 3350 if (rpcCall != null) { 3351 maxBlockSize -= rpcCall.getBlockBytesScanned(); 3352 maxCellSize -= rpcCall.getResponseCellSize(); 3353 } 3354 3355 contextBuilder.setSizeLimit(sizeScope, maxCellSize, maxCellSize, maxBlockSize); 3356 contextBuilder.setBatchLimit(scanner.getBatch()); 3357 contextBuilder.setTimeLimit(timeScope, timeLimit); 3358 contextBuilder.setTrackMetrics(trackMetrics); 3359 ScannerContext scannerContext = contextBuilder.build(); 3360 boolean limitReached = false; 3361 while (numOfResults < maxResults) { 3362 // Reset the batch progress to 0 before every call to RegionScanner#nextRaw. The 3363 // batch limit is a limit on the number of cells per Result. Thus, if progress is 3364 // being tracked (i.e. scannerContext.keepProgress() is true) then we need to 3365 // reset the batch progress between nextRaw invocations since we don't want the 3366 // batch progress from previous calls to affect future calls 3367 scannerContext.setBatchProgress(0); 3368 assert values.isEmpty(); 3369 3370 // Collect values to be returned here 3371 moreRows = scanner.nextRaw(values, scannerContext); 3372 if (rpcCall == null) { 3373 // When there is no RpcCallContext,copy EC to heap, then the scanner would close, 3374 // This can be an EXPENSIVE call. It may make an extra copy from offheap to onheap 3375 // buffers.See more details in HBASE-26036. 3376 CellUtil.cloneIfNecessary(values); 3377 } 3378 numOfNextRawCalls++; 3379 3380 if (!values.isEmpty()) { 3381 if (limitOfRows > 0) { 3382 // First we need to check if the last result is partial and we have a row change. If 3383 // so then we need to increase the numOfCompleteRows. 3384 if (results.isEmpty()) { 3385 if ( 3386 rsh.rowOfLastPartialResult != null 3387 && !CellUtil.matchingRows(values.get(0), rsh.rowOfLastPartialResult) 3388 ) { 3389 numOfCompleteRows++; 3390 checkLimitOfRows(numOfCompleteRows, limitOfRows, moreRows, scannerContext, 3391 builder); 3392 } 3393 } else { 3394 Result lastResult = results.get(results.size() - 1); 3395 if ( 3396 lastResult.mayHaveMoreCellsInRow() 3397 && !CellUtil.matchingRows(values.get(0), lastResult.getRow()) 3398 ) { 3399 numOfCompleteRows++; 3400 checkLimitOfRows(numOfCompleteRows, limitOfRows, moreRows, scannerContext, 3401 builder); 3402 } 3403 } 3404 if (builder.hasMoreResults() && !builder.getMoreResults()) { 3405 break; 3406 } 3407 } 3408 boolean mayHaveMoreCellsInRow = scannerContext.mayHaveMoreCellsInRow(); 3409 Result r = Result.create(values, null, stale, mayHaveMoreCellsInRow); 3410 results.add(r); 3411 numOfResults++; 3412 if (!mayHaveMoreCellsInRow && limitOfRows > 0) { 3413 numOfCompleteRows++; 3414 checkLimitOfRows(numOfCompleteRows, limitOfRows, moreRows, scannerContext, builder); 3415 if (builder.hasMoreResults() && !builder.getMoreResults()) { 3416 break; 3417 } 3418 } 3419 } else if (!moreRows && !results.isEmpty()) { 3420 // No more cells for the scan here, we need to ensure that the mayHaveMoreCellsInRow of 3421 // last result is false. Otherwise it's possible that: the first nextRaw returned 3422 // because BATCH_LIMIT_REACHED (BTW it happen to exhaust all cells of the scan),so the 3423 // last result's mayHaveMoreCellsInRow will be true. while the following nextRaw will 3424 // return with moreRows=false, which means moreResultsInRegion would be false, it will 3425 // be a contradictory state (HBASE-21206). 3426 int lastIdx = results.size() - 1; 3427 Result r = results.get(lastIdx); 3428 if (r.mayHaveMoreCellsInRow()) { 3429 results.set(lastIdx, 3430 PackagePrivateFieldAccessor.createResult( 3431 PackagePrivateFieldAccessor.getExtendedRawCells(r), r.getExists(), r.isStale(), 3432 false)); 3433 } 3434 } 3435 boolean sizeLimitReached = scannerContext.checkSizeLimit(LimitScope.BETWEEN_ROWS); 3436 boolean timeLimitReached = scannerContext.checkTimeLimit(LimitScope.BETWEEN_ROWS); 3437 boolean resultsLimitReached = numOfResults >= maxResults; 3438 limitReached = sizeLimitReached || timeLimitReached || resultsLimitReached; 3439 3440 if (limitReached || !moreRows) { 3441 // With block size limit, we may exceed size limit without collecting any results. 3442 // In this case we want to send heartbeat and/or cursor. We don't want to send heartbeat 3443 // or cursor if results were collected, for example for cell size or heap size limits. 3444 boolean sizeLimitReachedWithoutResults = sizeLimitReached && results.isEmpty(); 3445 // We only want to mark a ScanResponse as a heartbeat message in the event that 3446 // there are more values to be read server side. If there aren't more values, 3447 // marking it as a heartbeat is wasteful because the client will need to issue 3448 // another ScanRequest only to realize that they already have all the values 3449 if (moreRows && (timeLimitReached || sizeLimitReachedWithoutResults)) { 3450 // Heartbeat messages occur when the time limit has been reached, or size limit has 3451 // been reached before collecting any results. This can happen for heavily filtered 3452 // scans which scan over too many blocks. 3453 builder.setHeartbeatMessage(true); 3454 if (rsh.needCursor) { 3455 Cell cursorCell = scannerContext.getLastPeekedCell(); 3456 if (cursorCell != null) { 3457 builder.setCursor(ProtobufUtil.toCursor(cursorCell)); 3458 } 3459 } 3460 } 3461 break; 3462 } 3463 values.clear(); 3464 } 3465 if (rpcCall != null) { 3466 rpcCall.incrementResponseCellSize(scannerContext.getHeapSizeProgress()); 3467 } 3468 builder.setMoreResultsInRegion(moreRows); 3469 // Check to see if the client requested that we track metrics server side. If the 3470 // client requested metrics, retrieve the metrics from the scanner context. 3471 if (trackMetrics) { 3472 // rather than increment yet another counter in StoreScanner, just set the value here 3473 // from block size progress before writing into the response 3474 scannerContext.getMetrics().countOfBlockBytesScanned 3475 .set(scannerContext.getBlockSizeProgress()); 3476 if (rpcCall != null) { 3477 scannerContext.getMetrics().fsReadTime.set(rpcCall.getFsReadTime()); 3478 } 3479 Map<String, Long> metrics = scannerContext.getMetrics().getMetricsMap(); 3480 ScanMetrics.Builder metricBuilder = ScanMetrics.newBuilder(); 3481 NameInt64Pair.Builder pairBuilder = NameInt64Pair.newBuilder(); 3482 3483 for (Entry<String, Long> entry : metrics.entrySet()) { 3484 pairBuilder.setName(entry.getKey()); 3485 pairBuilder.setValue(entry.getValue()); 3486 metricBuilder.addMetrics(pairBuilder.build()); 3487 } 3488 3489 builder.setScanMetrics(metricBuilder.build()); 3490 } 3491 } 3492 } finally { 3493 region.closeRegionOperation(); 3494 // Update serverside metrics, even on error. 3495 long end = EnvironmentEdgeManager.currentTime(); 3496 3497 long responseCellSize = 0; 3498 long blockBytesScanned = 0; 3499 if (rpcCall != null) { 3500 responseCellSize = rpcCall.getResponseCellSize(); 3501 blockBytesScanned = rpcCall.getBlockBytesScanned(); 3502 rsh.updateBlockBytesScanned(blockBytesScanned); 3503 } 3504 region.getMetrics().updateScan(); 3505 final MetricsRegionServer metricsRegionServer = server.getMetrics(); 3506 if (metricsRegionServer != null) { 3507 metricsRegionServer.updateScan(region, end - before, responseCellSize, blockBytesScanned); 3508 metricsRegionServer.updateReadQueryMeter(region, numOfNextRawCalls); 3509 } 3510 } 3511 // coprocessor postNext hook 3512 if (region.getCoprocessorHost() != null) { 3513 region.getCoprocessorHost().postScannerNext(scanner, results, maxResults, true); 3514 } 3515 } 3516 3517 /** 3518 * Scan data in a table. 3519 * @param controller the RPC controller 3520 * @param request the scan request 3521 */ 3522 @Override 3523 public ScanResponse scan(final RpcController controller, final ScanRequest request) 3524 throws ServiceException { 3525 if (controller != null && !(controller instanceof HBaseRpcController)) { 3526 throw new UnsupportedOperationException( 3527 "We only do " + "HBaseRpcControllers! FIX IF A PROBLEM: " + controller); 3528 } 3529 if (!request.hasScannerId() && !request.hasScan()) { 3530 throw new ServiceException( 3531 new DoNotRetryIOException("Missing required input: scannerId or scan")); 3532 } 3533 try { 3534 checkOpen(); 3535 } catch (IOException e) { 3536 if (request.hasScannerId()) { 3537 String scannerName = toScannerName(request.getScannerId()); 3538 if (LOG.isDebugEnabled()) { 3539 LOG.debug( 3540 "Server shutting down and client tried to access missing scanner " + scannerName); 3541 } 3542 final LeaseManager leaseManager = server.getLeaseManager(); 3543 if (leaseManager != null) { 3544 try { 3545 leaseManager.cancelLease(scannerName); 3546 } catch (LeaseException le) { 3547 // No problem, ignore 3548 if (LOG.isTraceEnabled()) { 3549 LOG.trace("Un-able to cancel lease of scanner. It could already be closed."); 3550 } 3551 } 3552 } 3553 } 3554 throw new ServiceException(e); 3555 } 3556 requestCount.increment(); 3557 rpcScanRequestCount.increment(); 3558 RegionScannerHolder rsh; 3559 ScanResponse.Builder builder = ScanResponse.newBuilder(); 3560 String scannerName; 3561 try { 3562 if (request.hasScannerId()) { 3563 // The downstream projects such as AsyncHBase in OpenTSDB need this value. See HBASE-18000 3564 // for more details. 3565 long scannerId = request.getScannerId(); 3566 builder.setScannerId(scannerId); 3567 scannerName = toScannerName(scannerId); 3568 rsh = getRegionScanner(request); 3569 } else { 3570 Pair<String, RegionScannerHolder> scannerNameAndRSH = newRegionScanner(request, builder); 3571 scannerName = scannerNameAndRSH.getFirst(); 3572 rsh = scannerNameAndRSH.getSecond(); 3573 } 3574 } catch (IOException e) { 3575 if (e == SCANNER_ALREADY_CLOSED) { 3576 // Now we will close scanner automatically if there are no more results for this region but 3577 // the old client will still send a close request to us. Just ignore it and return. 3578 return builder.build(); 3579 } 3580 throw new ServiceException(e); 3581 } 3582 if (rsh.fullRegionScan) { 3583 rpcFullScanRequestCount.increment(); 3584 } 3585 HRegion region = rsh.r; 3586 LeaseManager.Lease lease; 3587 try { 3588 // Remove lease while its being processed in server; protects against case 3589 // where processing of request takes > lease expiration time. or null if none found. 3590 lease = server.getLeaseManager().removeLease(scannerName); 3591 } catch (LeaseException e) { 3592 throw new ServiceException(e); 3593 } 3594 if (request.hasRenew() && request.getRenew()) { 3595 // add back and return 3596 addScannerLeaseBack(lease); 3597 try { 3598 checkScanNextCallSeq(request, rsh); 3599 } catch (OutOfOrderScannerNextException e) { 3600 throw new ServiceException(e); 3601 } 3602 return builder.build(); 3603 } 3604 OperationQuota quota; 3605 try { 3606 quota = getRpcQuotaManager().checkScanQuota(region, request, maxScannerResultSize, 3607 rsh.getMaxBlockBytesScanned(), rsh.getPrevBlockBytesScannedDifference()); 3608 } catch (IOException e) { 3609 addScannerLeaseBack(lease); 3610 throw new ServiceException(e); 3611 } 3612 try { 3613 checkScanNextCallSeq(request, rsh); 3614 } catch (OutOfOrderScannerNextException e) { 3615 addScannerLeaseBack(lease); 3616 throw new ServiceException(e); 3617 } 3618 // Now we have increased the next call sequence. If we give client an error, the retry will 3619 // never success. So we'd better close the scanner and return a DoNotRetryIOException to client 3620 // and then client will try to open a new scanner. 3621 boolean closeScanner = request.hasCloseScanner() ? request.getCloseScanner() : false; 3622 int rows; // this is scan.getCaching 3623 if (request.hasNumberOfRows()) { 3624 rows = request.getNumberOfRows(); 3625 } else { 3626 rows = closeScanner ? 0 : 1; 3627 } 3628 RpcCall rpcCall = RpcServer.getCurrentCall().orElse(null); 3629 // now let's do the real scan. 3630 long maxQuotaResultSize = Math.min(maxScannerResultSize, quota.getMaxResultSize()); 3631 RegionScanner scanner = rsh.s; 3632 // this is the limit of rows for this scan, if we the number of rows reach this value, we will 3633 // close the scanner. 3634 int limitOfRows; 3635 if (request.hasLimitOfRows()) { 3636 limitOfRows = request.getLimitOfRows(); 3637 } else { 3638 limitOfRows = -1; 3639 } 3640 boolean scannerClosed = false; 3641 try { 3642 List<Result> results = new ArrayList<>(Math.min(rows, 512)); 3643 if (rows > 0) { 3644 boolean done = false; 3645 // Call coprocessor. Get region info from scanner. 3646 if (region.getCoprocessorHost() != null) { 3647 Boolean bypass = region.getCoprocessorHost().preScannerNext(scanner, results, rows); 3648 if (!results.isEmpty()) { 3649 for (Result r : results) { 3650 // add cell size from CP results so we can track response size and update limits 3651 // when calling scan below if !done. We'll also have tracked block size if the CP 3652 // got results from hbase, since StoreScanner tracks that for all calls automatically. 3653 addSize(rpcCall, r); 3654 } 3655 } 3656 if (bypass != null && bypass.booleanValue()) { 3657 done = true; 3658 } 3659 } 3660 if (!done) { 3661 scan((HBaseRpcController) controller, request, rsh, maxQuotaResultSize, rows, limitOfRows, 3662 results, builder, rpcCall); 3663 } else { 3664 builder.setMoreResultsInRegion(!results.isEmpty()); 3665 } 3666 } else { 3667 // This is a open scanner call with numberOfRow = 0, so set more results in region to true. 3668 builder.setMoreResultsInRegion(true); 3669 } 3670 3671 quota.addScanResult(results); 3672 addResults(builder, results, (HBaseRpcController) controller, 3673 RegionReplicaUtil.isDefaultReplica(region.getRegionInfo()), 3674 isClientCellBlockSupport(rpcCall)); 3675 if (scanner.isFilterDone() && results.isEmpty()) { 3676 // If the scanner's filter - if any - is done with the scan 3677 // only set moreResults to false if the results is empty. This is used to keep compatible 3678 // with the old scan implementation where we just ignore the returned results if moreResults 3679 // is false. Can remove the isEmpty check after we get rid of the old implementation. 3680 builder.setMoreResults(false); 3681 } 3682 // Later we may close the scanner depending on this flag so here we need to make sure that we 3683 // have already set this flag. 3684 assert builder.hasMoreResultsInRegion(); 3685 // we only set moreResults to false in the above code, so set it to true if we haven't set it 3686 // yet. 3687 if (!builder.hasMoreResults()) { 3688 builder.setMoreResults(true); 3689 } 3690 if (builder.getMoreResults() && builder.getMoreResultsInRegion() && !results.isEmpty()) { 3691 // Record the last cell of the last result if it is a partial result 3692 // We need this to calculate the complete rows we have returned to client as the 3693 // mayHaveMoreCellsInRow is true does not mean that there will be extra cells for the 3694 // current row. We may filter out all the remaining cells for the current row and just 3695 // return the cells of the nextRow when calling RegionScanner.nextRaw. So here we need to 3696 // check for row change. 3697 Result lastResult = results.get(results.size() - 1); 3698 if (lastResult.mayHaveMoreCellsInRow()) { 3699 rsh.rowOfLastPartialResult = lastResult.getRow(); 3700 } else { 3701 rsh.rowOfLastPartialResult = null; 3702 } 3703 } 3704 if (!builder.getMoreResults() || !builder.getMoreResultsInRegion() || closeScanner) { 3705 scannerClosed = true; 3706 closeScanner(region, scanner, scannerName, rpcCall, false); 3707 } 3708 3709 // There's no point returning to a timed out client. Throwing ensures scanner is closed 3710 if (rpcCall != null && EnvironmentEdgeManager.currentTime() > rpcCall.getDeadline()) { 3711 throw new TimeoutIOException("Client deadline exceeded, cannot return results"); 3712 } 3713 3714 return builder.build(); 3715 } catch (IOException e) { 3716 try { 3717 // scanner is closed here 3718 scannerClosed = true; 3719 // The scanner state might be left in a dirty state, so we will tell the Client to 3720 // fail this RPC and close the scanner while opening up another one from the start of 3721 // row that the client has last seen. 3722 closeScanner(region, scanner, scannerName, rpcCall, true); 3723 3724 // If it is a DoNotRetryIOException already, throw as it is. Unfortunately, DNRIOE is 3725 // used in two different semantics. 3726 // (1) The first is to close the client scanner and bubble up the exception all the way 3727 // to the application. This is preferred when the exception is really un-recoverable 3728 // (like CorruptHFileException, etc). Plain DoNotRetryIOException also falls into this 3729 // bucket usually. 3730 // (2) Second semantics is to close the current region scanner only, but continue the 3731 // client scanner by overriding the exception. This is usually UnknownScannerException, 3732 // OutOfOrderScannerNextException, etc where the region scanner has to be closed, but the 3733 // application-level ClientScanner has to continue without bubbling up the exception to 3734 // the client. See ClientScanner code to see how it deals with these special exceptions. 3735 if (e instanceof DoNotRetryIOException) { 3736 throw e; 3737 } 3738 3739 // If it is a FileNotFoundException, wrap as a 3740 // DoNotRetryIOException. This can avoid the retry in ClientScanner. 3741 if (e instanceof FileNotFoundException) { 3742 throw new DoNotRetryIOException(e); 3743 } 3744 3745 // We closed the scanner already. Instead of throwing the IOException, and client 3746 // retrying with the same scannerId only to get USE on the next RPC, we directly throw 3747 // a special exception to save an RPC. 3748 if (VersionInfoUtil.hasMinimumVersion(rpcCall.getClientVersionInfo(), 1, 4)) { 3749 // 1.4.0+ clients know how to handle 3750 throw new ScannerResetException("Scanner is closed on the server-side", e); 3751 } else { 3752 // older clients do not know about SRE. Just throw USE, which they will handle 3753 throw new UnknownScannerException("Throwing UnknownScannerException to reset the client" 3754 + " scanner state for clients older than 1.3.", e); 3755 } 3756 } catch (IOException ioe) { 3757 throw new ServiceException(ioe); 3758 } 3759 } finally { 3760 if (!scannerClosed) { 3761 // Adding resets expiration time on lease. 3762 // the closeCallBack will be set in closeScanner so here we only care about shippedCallback 3763 if (rpcCall != null) { 3764 rpcCall.setCallBack(rsh.shippedCallback); 3765 } else { 3766 // If context is null,here we call rsh.shippedCallback directly to reuse the logic in 3767 // rsh.shippedCallback to release the internal resources in rsh,and lease is also added 3768 // back to regionserver's LeaseManager in rsh.shippedCallback. 3769 runShippedCallback(rsh); 3770 } 3771 } 3772 quota.close(); 3773 } 3774 } 3775 3776 private void runShippedCallback(RegionScannerHolder rsh) throws ServiceException { 3777 assert rsh.shippedCallback != null; 3778 try { 3779 rsh.shippedCallback.run(); 3780 } catch (IOException ioe) { 3781 throw new ServiceException(ioe); 3782 } 3783 } 3784 3785 private void closeScanner(HRegion region, RegionScanner scanner, String scannerName, 3786 RpcCallContext context, boolean isError) throws IOException { 3787 if (region.getCoprocessorHost() != null) { 3788 if (region.getCoprocessorHost().preScannerClose(scanner)) { 3789 // bypass the actual close. 3790 return; 3791 } 3792 } 3793 RegionScannerHolder rsh = scanners.remove(scannerName); 3794 if (rsh != null) { 3795 if (context != null) { 3796 context.setCallBack(rsh.closeCallBack); 3797 } else { 3798 rsh.s.close(); 3799 } 3800 if (region.getCoprocessorHost() != null) { 3801 region.getCoprocessorHost().postScannerClose(scanner); 3802 } 3803 if (!isError) { 3804 closedScanners.put(scannerName, rsh.getNextCallSeq()); 3805 } 3806 } 3807 } 3808 3809 @Override 3810 public CoprocessorServiceResponse execRegionServerService(RpcController controller, 3811 CoprocessorServiceRequest request) throws ServiceException { 3812 rpcPreCheck("execRegionServerService"); 3813 return server.execRegionServerService(controller, request); 3814 } 3815 3816 @Override 3817 public GetSpaceQuotaSnapshotsResponse getSpaceQuotaSnapshots(RpcController controller, 3818 GetSpaceQuotaSnapshotsRequest request) throws ServiceException { 3819 try { 3820 final RegionServerSpaceQuotaManager manager = server.getRegionServerSpaceQuotaManager(); 3821 final GetSpaceQuotaSnapshotsResponse.Builder builder = 3822 GetSpaceQuotaSnapshotsResponse.newBuilder(); 3823 if (manager != null) { 3824 final Map<TableName, SpaceQuotaSnapshot> snapshots = manager.copyQuotaSnapshots(); 3825 for (Entry<TableName, SpaceQuotaSnapshot> snapshot : snapshots.entrySet()) { 3826 builder.addSnapshots(TableQuotaSnapshot.newBuilder() 3827 .setTableName(ProtobufUtil.toProtoTableName(snapshot.getKey())) 3828 .setSnapshot(SpaceQuotaSnapshot.toProtoSnapshot(snapshot.getValue())).build()); 3829 } 3830 } 3831 return builder.build(); 3832 } catch (Exception e) { 3833 throw new ServiceException(e); 3834 } 3835 } 3836 3837 @Override 3838 public ClearRegionBlockCacheResponse clearRegionBlockCache(RpcController controller, 3839 ClearRegionBlockCacheRequest request) throws ServiceException { 3840 try { 3841 rpcPreCheck("clearRegionBlockCache"); 3842 ClearRegionBlockCacheResponse.Builder builder = ClearRegionBlockCacheResponse.newBuilder(); 3843 CacheEvictionStatsBuilder stats = CacheEvictionStats.builder(); 3844 server.getRegionServerCoprocessorHost().preClearRegionBlockCache(); 3845 List<HRegion> regions = getRegions(request.getRegionList(), stats); 3846 for (HRegion region : regions) { 3847 try { 3848 stats = stats.append(this.server.clearRegionBlockCache(region)); 3849 } catch (Exception e) { 3850 stats.addException(region.getRegionInfo().getRegionName(), e); 3851 } 3852 } 3853 stats.withMaxCacheSize(server.getBlockCache().map(BlockCache::getMaxSize).orElse(0L)); 3854 server.getRegionServerCoprocessorHost().postClearRegionBlockCache(stats.build()); 3855 return builder.setStats(ProtobufUtil.toCacheEvictionStats(stats.build())).build(); 3856 } catch (IOException e) { 3857 throw new ServiceException(e); 3858 } 3859 } 3860 3861 private void executeOpenRegionProcedures(OpenRegionRequest request, 3862 Map<TableName, TableDescriptor> tdCache) { 3863 long masterSystemTime = request.hasMasterSystemTime() ? request.getMasterSystemTime() : -1; 3864 for (RegionOpenInfo regionOpenInfo : request.getOpenInfoList()) { 3865 RegionInfo regionInfo = ProtobufUtil.toRegionInfo(regionOpenInfo.getRegion()); 3866 TableName tableName = regionInfo.getTable(); 3867 TableDescriptor tableDesc = tdCache.get(tableName); 3868 if (tableDesc == null) { 3869 try { 3870 tableDesc = server.getTableDescriptors().get(regionInfo.getTable()); 3871 } catch (IOException e) { 3872 // Here we do not fail the whole method since we also need deal with other 3873 // procedures, and we can not ignore this one, so we still schedule a 3874 // AssignRegionHandler and it will report back to master if we still can not get the 3875 // TableDescriptor. 3876 LOG.warn("Failed to get TableDescriptor of {}, will try again in the handler", 3877 regionInfo.getTable(), e); 3878 } 3879 if (tableDesc != null) { 3880 tdCache.put(tableName, tableDesc); 3881 } 3882 } 3883 if (regionOpenInfo.getFavoredNodesCount() > 0) { 3884 server.updateRegionFavoredNodesMapping(regionInfo.getEncodedName(), 3885 regionOpenInfo.getFavoredNodesList()); 3886 } 3887 long procId = regionOpenInfo.getOpenProcId(); 3888 if (server.submitRegionProcedure(procId)) { 3889 server.getExecutorService().submit( 3890 AssignRegionHandler.create(server, regionInfo, procId, tableDesc, masterSystemTime)); 3891 } 3892 } 3893 } 3894 3895 private void executeCloseRegionProcedures(CloseRegionRequest request) { 3896 String encodedName; 3897 try { 3898 encodedName = ProtobufUtil.getRegionEncodedName(request.getRegion()); 3899 } catch (DoNotRetryIOException e) { 3900 throw new UncheckedIOException("Should not happen", e); 3901 } 3902 ServerName destination = request.hasDestinationServer() 3903 ? ProtobufUtil.toServerName(request.getDestinationServer()) 3904 : null; 3905 long procId = request.getCloseProcId(); 3906 boolean evictCache = request.getEvictCache(); 3907 if (server.submitRegionProcedure(procId)) { 3908 server.getExecutorService().submit( 3909 UnassignRegionHandler.create(server, encodedName, procId, false, destination, evictCache)); 3910 } 3911 } 3912 3913 private void executeProcedures(RemoteProcedureRequest request) { 3914 RSProcedureCallable callable; 3915 try { 3916 callable = Class.forName(request.getProcClass()).asSubclass(RSProcedureCallable.class) 3917 .getDeclaredConstructor().newInstance(); 3918 } catch (Exception e) { 3919 LOG.warn("Failed to instantiating remote procedure {}, pid={}", request.getProcClass(), 3920 request.getProcId(), e); 3921 server.remoteProcedureComplete(request.getProcId(), e); 3922 return; 3923 } 3924 callable.init(request.getProcData().toByteArray(), server); 3925 LOG.debug("Executing remote procedure {}, pid={}", callable.getClass(), request.getProcId()); 3926 server.executeProcedure(request.getProcId(), callable); 3927 } 3928 3929 @Override 3930 @QosPriority(priority = HConstants.ADMIN_QOS) 3931 public ExecuteProceduresResponse executeProcedures(RpcController controller, 3932 ExecuteProceduresRequest request) throws ServiceException { 3933 try { 3934 checkOpen(); 3935 throwOnWrongStartCode(request); 3936 server.getRegionServerCoprocessorHost().preExecuteProcedures(); 3937 if (request.getOpenRegionCount() > 0) { 3938 // Avoid reading from the TableDescritor every time(usually it will read from the file 3939 // system) 3940 Map<TableName, TableDescriptor> tdCache = new HashMap<>(); 3941 request.getOpenRegionList().forEach(req -> executeOpenRegionProcedures(req, tdCache)); 3942 } 3943 if (request.getCloseRegionCount() > 0) { 3944 request.getCloseRegionList().forEach(this::executeCloseRegionProcedures); 3945 } 3946 if (request.getProcCount() > 0) { 3947 request.getProcList().forEach(this::executeProcedures); 3948 } 3949 server.getRegionServerCoprocessorHost().postExecuteProcedures(); 3950 return ExecuteProceduresResponse.getDefaultInstance(); 3951 } catch (IOException e) { 3952 throw new ServiceException(e); 3953 } 3954 } 3955 3956 @Override 3957 public GetAllBootstrapNodesResponse getAllBootstrapNodes(RpcController controller, 3958 GetAllBootstrapNodesRequest request) throws ServiceException { 3959 GetAllBootstrapNodesResponse.Builder builder = GetAllBootstrapNodesResponse.newBuilder(); 3960 server.getBootstrapNodes() 3961 .forEachRemaining(server -> builder.addNode(ProtobufUtil.toServerName(server))); 3962 return builder.build(); 3963 } 3964 3965 private void setReloadableGuardrails(Configuration conf) { 3966 rowSizeWarnThreshold = 3967 conf.getInt(HConstants.BATCH_ROWS_THRESHOLD_NAME, HConstants.BATCH_ROWS_THRESHOLD_DEFAULT); 3968 rejectRowsWithSizeOverThreshold = 3969 conf.getBoolean(REJECT_BATCH_ROWS_OVER_THRESHOLD, DEFAULT_REJECT_BATCH_ROWS_OVER_THRESHOLD); 3970 maxScannerResultSize = conf.getLong(HConstants.HBASE_SERVER_SCANNER_MAX_RESULT_SIZE_KEY, 3971 HConstants.DEFAULT_HBASE_SERVER_SCANNER_MAX_RESULT_SIZE); 3972 } 3973 3974 @Override 3975 public void onConfigurationChange(Configuration conf) { 3976 super.onConfigurationChange(conf); 3977 setReloadableGuardrails(conf); 3978 } 3979 3980 @Override 3981 public GetCachedFilesListResponse getCachedFilesList(RpcController controller, 3982 GetCachedFilesListRequest request) throws ServiceException { 3983 GetCachedFilesListResponse.Builder responseBuilder = GetCachedFilesListResponse.newBuilder(); 3984 List<String> fullyCachedFiles = new ArrayList<>(); 3985 server.getBlockCache().flatMap(BlockCache::getFullyCachedFiles).ifPresent(fcf -> { 3986 fullyCachedFiles.addAll(fcf.keySet()); 3987 }); 3988 return responseBuilder.addAllCachedFiles(fullyCachedFiles).build(); 3989 } 3990}