001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019 020import com.google.errorprone.annotations.RestrictedApi; 021import java.io.FileNotFoundException; 022import java.io.IOException; 023import java.io.UncheckedIOException; 024import java.net.BindException; 025import java.net.InetAddress; 026import java.net.InetSocketAddress; 027import java.util.ArrayList; 028import java.util.Arrays; 029import java.util.Collections; 030import java.util.HashMap; 031import java.util.Iterator; 032import java.util.List; 033import java.util.Map; 034import java.util.Map.Entry; 035import java.util.NavigableMap; 036import java.util.Set; 037import java.util.TreeSet; 038import java.util.concurrent.ConcurrentHashMap; 039import java.util.concurrent.ConcurrentMap; 040import java.util.concurrent.TimeUnit; 041import java.util.concurrent.atomic.AtomicBoolean; 042import java.util.concurrent.atomic.AtomicLong; 043import java.util.concurrent.atomic.LongAdder; 044import org.apache.hadoop.conf.Configuration; 045import org.apache.hadoop.fs.FileSystem; 046import org.apache.hadoop.fs.Path; 047import org.apache.hadoop.hbase.CacheEvictionStats; 048import org.apache.hadoop.hbase.CacheEvictionStatsBuilder; 049import org.apache.hadoop.hbase.Cell; 050import org.apache.hadoop.hbase.CellScannable; 051import org.apache.hadoop.hbase.CellScanner; 052import org.apache.hadoop.hbase.CellUtil; 053import org.apache.hadoop.hbase.DoNotRetryIOException; 054import org.apache.hadoop.hbase.DroppedSnapshotException; 055import org.apache.hadoop.hbase.HBaseIOException; 056import org.apache.hadoop.hbase.HBaseRpcServicesBase; 057import org.apache.hadoop.hbase.HConstants; 058import org.apache.hadoop.hbase.MultiActionResultTooLarge; 059import org.apache.hadoop.hbase.NotServingRegionException; 060import org.apache.hadoop.hbase.PrivateCellUtil; 061import org.apache.hadoop.hbase.RegionTooBusyException; 062import org.apache.hadoop.hbase.Server; 063import org.apache.hadoop.hbase.ServerName; 064import org.apache.hadoop.hbase.TableName; 065import org.apache.hadoop.hbase.UnknownScannerException; 066import org.apache.hadoop.hbase.client.Append; 067import org.apache.hadoop.hbase.client.CheckAndMutate; 068import org.apache.hadoop.hbase.client.CheckAndMutateResult; 069import org.apache.hadoop.hbase.client.Delete; 070import org.apache.hadoop.hbase.client.Durability; 071import org.apache.hadoop.hbase.client.Get; 072import org.apache.hadoop.hbase.client.Increment; 073import org.apache.hadoop.hbase.client.Mutation; 074import org.apache.hadoop.hbase.client.OperationWithAttributes; 075import org.apache.hadoop.hbase.client.Put; 076import org.apache.hadoop.hbase.client.RegionInfo; 077import org.apache.hadoop.hbase.client.RegionReplicaUtil; 078import org.apache.hadoop.hbase.client.Result; 079import org.apache.hadoop.hbase.client.Row; 080import org.apache.hadoop.hbase.client.Scan; 081import org.apache.hadoop.hbase.client.TableDescriptor; 082import org.apache.hadoop.hbase.client.VersionInfoUtil; 083import org.apache.hadoop.hbase.exceptions.FailedSanityCheckException; 084import org.apache.hadoop.hbase.exceptions.OutOfOrderScannerNextException; 085import org.apache.hadoop.hbase.exceptions.ScannerResetException; 086import org.apache.hadoop.hbase.exceptions.TimeoutIOException; 087import org.apache.hadoop.hbase.exceptions.UnknownProtocolException; 088import org.apache.hadoop.hbase.io.ByteBuffAllocator; 089import org.apache.hadoop.hbase.io.hfile.BlockCache; 090import org.apache.hadoop.hbase.ipc.HBaseRpcController; 091import org.apache.hadoop.hbase.ipc.PriorityFunction; 092import org.apache.hadoop.hbase.ipc.QosPriority; 093import org.apache.hadoop.hbase.ipc.RpcCall; 094import org.apache.hadoop.hbase.ipc.RpcCallContext; 095import org.apache.hadoop.hbase.ipc.RpcCallback; 096import org.apache.hadoop.hbase.ipc.RpcServer; 097import org.apache.hadoop.hbase.ipc.RpcServer.BlockingServiceAndInterface; 098import org.apache.hadoop.hbase.ipc.RpcServerFactory; 099import org.apache.hadoop.hbase.ipc.RpcServerInterface; 100import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException; 101import org.apache.hadoop.hbase.ipc.ServerRpcController; 102import org.apache.hadoop.hbase.net.Address; 103import org.apache.hadoop.hbase.procedure2.RSProcedureCallable; 104import org.apache.hadoop.hbase.quotas.ActivePolicyEnforcement; 105import org.apache.hadoop.hbase.quotas.OperationQuota; 106import org.apache.hadoop.hbase.quotas.QuotaUtil; 107import org.apache.hadoop.hbase.quotas.RegionServerRpcQuotaManager; 108import org.apache.hadoop.hbase.quotas.RegionServerSpaceQuotaManager; 109import org.apache.hadoop.hbase.quotas.SpaceQuotaSnapshot; 110import org.apache.hadoop.hbase.quotas.SpaceViolationPolicyEnforcement; 111import org.apache.hadoop.hbase.regionserver.LeaseManager.Lease; 112import org.apache.hadoop.hbase.regionserver.LeaseManager.LeaseStillHeldException; 113import org.apache.hadoop.hbase.regionserver.Region.Operation; 114import org.apache.hadoop.hbase.regionserver.ScannerContext.LimitScope; 115import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker; 116import org.apache.hadoop.hbase.regionserver.handler.AssignRegionHandler; 117import org.apache.hadoop.hbase.regionserver.handler.OpenMetaHandler; 118import org.apache.hadoop.hbase.regionserver.handler.OpenPriorityRegionHandler; 119import org.apache.hadoop.hbase.regionserver.handler.OpenRegionHandler; 120import org.apache.hadoop.hbase.regionserver.handler.UnassignRegionHandler; 121import org.apache.hadoop.hbase.replication.ReplicationUtils; 122import org.apache.hadoop.hbase.replication.regionserver.RejectReplicationRequestStateChecker; 123import org.apache.hadoop.hbase.replication.regionserver.RejectRequestsFromClientStateChecker; 124import org.apache.hadoop.hbase.security.Superusers; 125import org.apache.hadoop.hbase.security.access.Permission; 126import org.apache.hadoop.hbase.util.Bytes; 127import org.apache.hadoop.hbase.util.DNS; 128import org.apache.hadoop.hbase.util.DNS.ServerType; 129import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 130import org.apache.hadoop.hbase.util.Pair; 131import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil; 132import org.apache.hadoop.hbase.wal.WAL; 133import org.apache.hadoop.hbase.wal.WALEdit; 134import org.apache.hadoop.hbase.wal.WALKey; 135import org.apache.hadoop.hbase.wal.WALSplitUtil; 136import org.apache.hadoop.hbase.wal.WALSplitUtil.MutationReplay; 137import org.apache.hadoop.hbase.zookeeper.ZKWatcher; 138import org.apache.yetus.audience.InterfaceAudience; 139import org.slf4j.Logger; 140import org.slf4j.LoggerFactory; 141 142import org.apache.hbase.thirdparty.com.google.common.cache.Cache; 143import org.apache.hbase.thirdparty.com.google.common.cache.CacheBuilder; 144import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList; 145import org.apache.hbase.thirdparty.com.google.common.collect.Lists; 146import org.apache.hbase.thirdparty.com.google.protobuf.ByteString; 147import org.apache.hbase.thirdparty.com.google.protobuf.Message; 148import org.apache.hbase.thirdparty.com.google.protobuf.RpcController; 149import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException; 150import org.apache.hbase.thirdparty.com.google.protobuf.TextFormat; 151import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations; 152import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils; 153 154import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 155import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter; 156import org.apache.hadoop.hbase.shaded.protobuf.ResponseConverter; 157import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService; 158import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearCompactionQueuesRequest; 159import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearCompactionQueuesResponse; 160import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearRegionBlockCacheRequest; 161import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearRegionBlockCacheResponse; 162import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CloseRegionRequest; 163import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CloseRegionResponse; 164import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactRegionRequest; 165import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactRegionResponse; 166import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactionSwitchRequest; 167import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactionSwitchResponse; 168import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ExecuteProceduresRequest; 169import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ExecuteProceduresResponse; 170import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionRequest; 171import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionResponse; 172import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetCachedFilesListRequest; 173import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetCachedFilesListResponse; 174import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetOnlineRegionRequest; 175import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetOnlineRegionResponse; 176import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoRequest; 177import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoResponse; 178import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionLoadRequest; 179import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionLoadResponse; 180import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetServerInfoRequest; 181import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetServerInfoResponse; 182import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetStoreFileRequest; 183import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetStoreFileResponse; 184import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionRequest; 185import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionRequest.RegionOpenInfo; 186import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionResponse; 187import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionResponse.RegionOpeningState; 188import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.RemoteProcedureRequest; 189import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ReplicateWALEntryRequest; 190import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ReplicateWALEntryResponse; 191import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.RollWALWriterRequest; 192import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.RollWALWriterResponse; 193import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.StopServerRequest; 194import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.StopServerResponse; 195import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateFavoredNodesRequest; 196import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateFavoredNodesResponse; 197import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WALEntry; 198import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WarmupRegionRequest; 199import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WarmupRegionResponse; 200import org.apache.hadoop.hbase.shaded.protobuf.generated.BootstrapNodeProtos.BootstrapNodeService; 201import org.apache.hadoop.hbase.shaded.protobuf.generated.BootstrapNodeProtos.GetAllBootstrapNodesRequest; 202import org.apache.hadoop.hbase.shaded.protobuf.generated.BootstrapNodeProtos.GetAllBootstrapNodesResponse; 203import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos; 204import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.Action; 205import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.BulkLoadHFileRequest; 206import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.BulkLoadHFileRequest.FamilyPath; 207import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.BulkLoadHFileResponse; 208import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CleanupBulkLoadRequest; 209import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CleanupBulkLoadResponse; 210import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService; 211import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.Condition; 212import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceRequest; 213import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceResponse; 214import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.GetRequest; 215import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.GetResponse; 216import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MultiRegionLoadStats; 217import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MultiRequest; 218import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MultiResponse; 219import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateRequest; 220import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateResponse; 221import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutationProto; 222import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutationProto.MutationType; 223import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.PrepareBulkLoadRequest; 224import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.PrepareBulkLoadResponse; 225import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.RegionAction; 226import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.RegionActionResult; 227import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ResultOrException; 228import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanRequest; 229import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanResponse; 230import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos; 231import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos.RegionLoad; 232import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.NameBytesPair; 233import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.NameInt64Pair; 234import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier; 235import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType; 236import org.apache.hadoop.hbase.shaded.protobuf.generated.MapReduceProtos.ScanMetrics; 237import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsRequest; 238import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsResponse; 239import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsResponse.TableQuotaSnapshot; 240import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.ClientMetaService; 241import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.BulkLoadDescriptor; 242import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.CompactionDescriptor; 243import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.FlushDescriptor; 244import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.RegionEventDescriptor; 245 246/** 247 * Implements the regionserver RPC services. 248 */ 249@InterfaceAudience.Private 250public class RSRpcServices extends HBaseRpcServicesBase<HRegionServer> 251 implements ClientService.BlockingInterface, BootstrapNodeService.BlockingInterface { 252 253 private static final Logger LOG = LoggerFactory.getLogger(RSRpcServices.class); 254 255 /** RPC scheduler to use for the region server. */ 256 public static final String REGION_SERVER_RPC_SCHEDULER_FACTORY_CLASS = 257 "hbase.region.server.rpc.scheduler.factory.class"; 258 259 /** 260 * Minimum allowable time limit delta (in milliseconds) that can be enforced during scans. This 261 * configuration exists to prevent the scenario where a time limit is specified to be so 262 * restrictive that the time limit is reached immediately (before any cells are scanned). 263 */ 264 private static final String REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA = 265 "hbase.region.server.rpc.minimum.scan.time.limit.delta"; 266 /** 267 * Default value of {@link RSRpcServices#REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA} 268 */ 269 static final long DEFAULT_REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA = 10; 270 271 /** 272 * Whether to reject rows with size > threshold defined by 273 * {@link HConstants#BATCH_ROWS_THRESHOLD_NAME} 274 */ 275 private static final String REJECT_BATCH_ROWS_OVER_THRESHOLD = 276 "hbase.rpc.rows.size.threshold.reject"; 277 278 /** 279 * Default value of config {@link RSRpcServices#REJECT_BATCH_ROWS_OVER_THRESHOLD} 280 */ 281 private static final boolean DEFAULT_REJECT_BATCH_ROWS_OVER_THRESHOLD = false; 282 283 // Request counter. (Includes requests that are not serviced by regions.) 284 // Count only once for requests with multiple actions like multi/caching-scan/replayBatch 285 final LongAdder requestCount = new LongAdder(); 286 287 // Request counter for rpc get 288 final LongAdder rpcGetRequestCount = new LongAdder(); 289 290 // Request counter for rpc scan 291 final LongAdder rpcScanRequestCount = new LongAdder(); 292 293 // Request counter for scans that might end up in full scans 294 final LongAdder rpcFullScanRequestCount = new LongAdder(); 295 296 // Request counter for rpc multi 297 final LongAdder rpcMultiRequestCount = new LongAdder(); 298 299 // Request counter for rpc mutate 300 final LongAdder rpcMutateRequestCount = new LongAdder(); 301 302 private volatile long maxScannerResultSize; 303 304 private ScannerIdGenerator scannerIdGenerator; 305 private final ConcurrentMap<String, RegionScannerHolder> scanners = new ConcurrentHashMap<>(); 306 // Hold the name of a closed scanner for a while. This is used to keep compatible for old clients 307 // which may send next or close request to a region scanner which has already been exhausted. The 308 // entries will be removed automatically after scannerLeaseTimeoutPeriod. 309 private final Cache<String, String> closedScanners; 310 /** 311 * The lease timeout period for client scanners (milliseconds). 312 */ 313 private final int scannerLeaseTimeoutPeriod; 314 315 /** 316 * The RPC timeout period (milliseconds) 317 */ 318 private final int rpcTimeout; 319 320 /** 321 * The minimum allowable delta to use for the scan limit 322 */ 323 private final long minimumScanTimeLimitDelta; 324 325 /** 326 * Row size threshold for multi requests above which a warning is logged 327 */ 328 private volatile int rowSizeWarnThreshold; 329 /* 330 * Whether we should reject requests with very high no of rows i.e. beyond threshold defined by 331 * rowSizeWarnThreshold 332 */ 333 private volatile boolean rejectRowsWithSizeOverThreshold; 334 335 final AtomicBoolean clearCompactionQueues = new AtomicBoolean(false); 336 337 /** 338 * Services launched in RSRpcServices. By default they are on but you can use the below booleans 339 * to selectively enable/disable either Admin or Client Service (Rare is the case where you would 340 * ever turn off one or the other). 341 */ 342 public static final String REGIONSERVER_ADMIN_SERVICE_CONFIG = 343 "hbase.regionserver.admin.executorService"; 344 public static final String REGIONSERVER_CLIENT_SERVICE_CONFIG = 345 "hbase.regionserver.client.executorService"; 346 public static final String REGIONSERVER_CLIENT_META_SERVICE_CONFIG = 347 "hbase.regionserver.client.meta.executorService"; 348 349 /** 350 * An Rpc callback for closing a RegionScanner. 351 */ 352 private static final class RegionScannerCloseCallBack implements RpcCallback { 353 354 private final RegionScanner scanner; 355 356 public RegionScannerCloseCallBack(RegionScanner scanner) { 357 this.scanner = scanner; 358 } 359 360 @Override 361 public void run() throws IOException { 362 this.scanner.close(); 363 } 364 } 365 366 /** 367 * An Rpc callback for doing shipped() call on a RegionScanner. 368 */ 369 private class RegionScannerShippedCallBack implements RpcCallback { 370 private final String scannerName; 371 private final Shipper shipper; 372 private final Lease lease; 373 374 public RegionScannerShippedCallBack(String scannerName, Shipper shipper, Lease lease) { 375 this.scannerName = scannerName; 376 this.shipper = shipper; 377 this.lease = lease; 378 } 379 380 @Override 381 public void run() throws IOException { 382 this.shipper.shipped(); 383 // We're done. On way out re-add the above removed lease. The lease was temp removed for this 384 // Rpc call and we are at end of the call now. Time to add it back. 385 if (scanners.containsKey(scannerName)) { 386 if (lease != null) { 387 server.getLeaseManager().addLease(lease); 388 } 389 } 390 } 391 } 392 393 /** 394 * An RpcCallBack that creates a list of scanners that needs to perform callBack operation on 395 * completion of multiGets. 396 */ 397 static class RegionScannersCloseCallBack implements RpcCallback { 398 private final List<RegionScanner> scanners = new ArrayList<>(); 399 400 public void addScanner(RegionScanner scanner) { 401 this.scanners.add(scanner); 402 } 403 404 @Override 405 public void run() { 406 for (RegionScanner scanner : scanners) { 407 try { 408 scanner.close(); 409 } catch (IOException e) { 410 LOG.error("Exception while closing the scanner " + scanner, e); 411 } 412 } 413 } 414 } 415 416 /** 417 * Holder class which holds the RegionScanner, nextCallSeq and RpcCallbacks together. 418 */ 419 static final class RegionScannerHolder { 420 private final AtomicLong nextCallSeq = new AtomicLong(0); 421 private final RegionScanner s; 422 private final HRegion r; 423 private final RpcCallback closeCallBack; 424 private final RpcCallback shippedCallback; 425 private byte[] rowOfLastPartialResult; 426 private boolean needCursor; 427 private boolean fullRegionScan; 428 private final String clientIPAndPort; 429 private final String userName; 430 431 RegionScannerHolder(RegionScanner s, HRegion r, RpcCallback closeCallBack, 432 RpcCallback shippedCallback, boolean needCursor, boolean fullRegionScan, 433 String clientIPAndPort, String userName) { 434 this.s = s; 435 this.r = r; 436 this.closeCallBack = closeCallBack; 437 this.shippedCallback = shippedCallback; 438 this.needCursor = needCursor; 439 this.fullRegionScan = fullRegionScan; 440 this.clientIPAndPort = clientIPAndPort; 441 this.userName = userName; 442 } 443 444 long getNextCallSeq() { 445 return nextCallSeq.get(); 446 } 447 448 boolean incNextCallSeq(long currentSeq) { 449 // Use CAS to prevent multiple scan request running on the same scanner. 450 return nextCallSeq.compareAndSet(currentSeq, currentSeq + 1); 451 } 452 453 // Should be called only when we need to print lease expired messages otherwise 454 // cache the String once made. 455 @Override 456 public String toString() { 457 return "clientIPAndPort=" + this.clientIPAndPort + ", userName=" + this.userName 458 + ", regionInfo=" + this.r.getRegionInfo().getRegionNameAsString(); 459 } 460 } 461 462 /** 463 * Instantiated as a scanner lease. If the lease times out, the scanner is closed 464 */ 465 private class ScannerListener implements LeaseListener { 466 private final String scannerName; 467 468 ScannerListener(final String n) { 469 this.scannerName = n; 470 } 471 472 @Override 473 public void leaseExpired() { 474 RegionScannerHolder rsh = scanners.remove(this.scannerName); 475 if (rsh == null) { 476 LOG.warn("Scanner lease {} expired but no outstanding scanner", this.scannerName); 477 return; 478 } 479 LOG.info("Scanner lease {} expired {}", this.scannerName, rsh); 480 server.getMetrics().incrScannerLeaseExpired(); 481 RegionScanner s = rsh.s; 482 HRegion region = null; 483 try { 484 region = server.getRegion(s.getRegionInfo().getRegionName()); 485 if (region != null && region.getCoprocessorHost() != null) { 486 region.getCoprocessorHost().preScannerClose(s); 487 } 488 } catch (IOException e) { 489 LOG.error("Closing scanner {} {}", this.scannerName, rsh, e); 490 } finally { 491 try { 492 s.close(); 493 if (region != null && region.getCoprocessorHost() != null) { 494 region.getCoprocessorHost().postScannerClose(s); 495 } 496 } catch (IOException e) { 497 LOG.error("Closing scanner {} {}", this.scannerName, rsh, e); 498 } 499 } 500 } 501 } 502 503 private static ResultOrException getResultOrException(final ClientProtos.Result r, 504 final int index) { 505 return getResultOrException(ResponseConverter.buildActionResult(r), index); 506 } 507 508 private static ResultOrException getResultOrException(final Exception e, final int index) { 509 return getResultOrException(ResponseConverter.buildActionResult(e), index); 510 } 511 512 private static ResultOrException getResultOrException(final ResultOrException.Builder builder, 513 final int index) { 514 return builder.setIndex(index).build(); 515 } 516 517 /** 518 * Checks for the following pre-checks in order: 519 * <ol> 520 * <li>RegionServer is running</li> 521 * <li>If authorization is enabled, then RPC caller has ADMIN permissions</li> 522 * </ol> 523 * @param requestName name of rpc request. Used in reporting failures to provide context. 524 * @throws ServiceException If any of the above listed pre-check fails. 525 */ 526 private void rpcPreCheck(String requestName) throws ServiceException { 527 try { 528 checkOpen(); 529 requirePermission(requestName, Permission.Action.ADMIN); 530 } catch (IOException ioe) { 531 throw new ServiceException(ioe); 532 } 533 } 534 535 private boolean isClientCellBlockSupport(RpcCallContext context) { 536 return context != null && context.isClientCellBlockSupported(); 537 } 538 539 private void addResult(final MutateResponse.Builder builder, final Result result, 540 final HBaseRpcController rpcc, boolean clientCellBlockSupported) { 541 if (result == null) return; 542 if (clientCellBlockSupported) { 543 builder.setResult(ProtobufUtil.toResultNoData(result)); 544 rpcc.setCellScanner(result.cellScanner()); 545 } else { 546 ClientProtos.Result pbr = ProtobufUtil.toResult(result); 547 builder.setResult(pbr); 548 } 549 } 550 551 private void addResults(ScanResponse.Builder builder, List<Result> results, 552 HBaseRpcController controller, boolean isDefaultRegion, boolean clientCellBlockSupported) { 553 builder.setStale(!isDefaultRegion); 554 if (results.isEmpty()) { 555 return; 556 } 557 if (clientCellBlockSupported) { 558 for (Result res : results) { 559 builder.addCellsPerResult(res.size()); 560 builder.addPartialFlagPerResult(res.mayHaveMoreCellsInRow()); 561 } 562 controller.setCellScanner(CellUtil.createCellScanner(results)); 563 } else { 564 for (Result res : results) { 565 ClientProtos.Result pbr = ProtobufUtil.toResult(res); 566 builder.addResults(pbr); 567 } 568 } 569 } 570 571 private CheckAndMutateResult checkAndMutate(HRegion region, List<ClientProtos.Action> actions, 572 CellScanner cellScanner, Condition condition, long nonceGroup, 573 ActivePolicyEnforcement spaceQuotaEnforcement) throws IOException { 574 int countOfCompleteMutation = 0; 575 try { 576 if (!region.getRegionInfo().isMetaRegion()) { 577 server.getMemStoreFlusher().reclaimMemStoreMemory(); 578 } 579 List<Mutation> mutations = new ArrayList<>(); 580 long nonce = HConstants.NO_NONCE; 581 for (ClientProtos.Action action : actions) { 582 if (action.hasGet()) { 583 throw new DoNotRetryIOException( 584 "Atomic put and/or delete only, not a Get=" + action.getGet()); 585 } 586 MutationProto mutation = action.getMutation(); 587 MutationType type = mutation.getMutateType(); 588 switch (type) { 589 case PUT: 590 Put put = ProtobufUtil.toPut(mutation, cellScanner); 591 ++countOfCompleteMutation; 592 checkCellSizeLimit(region, put); 593 spaceQuotaEnforcement.getPolicyEnforcement(region).check(put); 594 mutations.add(put); 595 break; 596 case DELETE: 597 Delete del = ProtobufUtil.toDelete(mutation, cellScanner); 598 ++countOfCompleteMutation; 599 spaceQuotaEnforcement.getPolicyEnforcement(region).check(del); 600 mutations.add(del); 601 break; 602 case INCREMENT: 603 Increment increment = ProtobufUtil.toIncrement(mutation, cellScanner); 604 nonce = mutation.hasNonce() ? mutation.getNonce() : HConstants.NO_NONCE; 605 ++countOfCompleteMutation; 606 checkCellSizeLimit(region, increment); 607 spaceQuotaEnforcement.getPolicyEnforcement(region).check(increment); 608 mutations.add(increment); 609 break; 610 case APPEND: 611 Append append = ProtobufUtil.toAppend(mutation, cellScanner); 612 nonce = mutation.hasNonce() ? mutation.getNonce() : HConstants.NO_NONCE; 613 ++countOfCompleteMutation; 614 checkCellSizeLimit(region, append); 615 spaceQuotaEnforcement.getPolicyEnforcement(region).check(append); 616 mutations.add(append); 617 break; 618 default: 619 throw new DoNotRetryIOException("invalid mutation type : " + type); 620 } 621 } 622 623 if (mutations.size() == 0) { 624 return new CheckAndMutateResult(true, null); 625 } else { 626 CheckAndMutate checkAndMutate = ProtobufUtil.toCheckAndMutate(condition, mutations); 627 CheckAndMutateResult result = null; 628 if (region.getCoprocessorHost() != null) { 629 result = region.getCoprocessorHost().preCheckAndMutate(checkAndMutate); 630 } 631 if (result == null) { 632 result = region.checkAndMutate(checkAndMutate, nonceGroup, nonce); 633 if (region.getCoprocessorHost() != null) { 634 result = region.getCoprocessorHost().postCheckAndMutate(checkAndMutate, result); 635 } 636 } 637 return result; 638 } 639 } finally { 640 // Currently, the checkAndMutate isn't supported by batch so it won't mess up the cell scanner 641 // even if the malformed cells are not skipped. 642 for (int i = countOfCompleteMutation; i < actions.size(); ++i) { 643 skipCellsForMutation(actions.get(i), cellScanner); 644 } 645 } 646 } 647 648 /** 649 * Execute an append mutation. 650 * @return result to return to client if default operation should be bypassed as indicated by 651 * RegionObserver, null otherwise 652 */ 653 private Result append(final HRegion region, final OperationQuota quota, 654 final MutationProto mutation, final CellScanner cellScanner, long nonceGroup, 655 ActivePolicyEnforcement spaceQuota, RpcCallContext context) throws IOException { 656 long before = EnvironmentEdgeManager.currentTime(); 657 Append append = ProtobufUtil.toAppend(mutation, cellScanner); 658 checkCellSizeLimit(region, append); 659 spaceQuota.getPolicyEnforcement(region).check(append); 660 quota.addMutation(append); 661 long blockBytesScannedBefore = context != null ? context.getBlockBytesScanned() : 0; 662 long nonce = mutation.hasNonce() ? mutation.getNonce() : HConstants.NO_NONCE; 663 Result r = region.append(append, nonceGroup, nonce); 664 if (server.getMetrics() != null) { 665 long blockBytesScanned = 666 context != null ? context.getBlockBytesScanned() - blockBytesScannedBefore : 0; 667 server.getMetrics().updateAppend(region, EnvironmentEdgeManager.currentTime() - before, 668 blockBytesScanned); 669 } 670 return r == null ? Result.EMPTY_RESULT : r; 671 } 672 673 /** 674 * Execute an increment mutation. 675 */ 676 private Result increment(final HRegion region, final OperationQuota quota, 677 final MutationProto mutation, final CellScanner cells, long nonceGroup, 678 ActivePolicyEnforcement spaceQuota, RpcCallContext context) throws IOException { 679 long before = EnvironmentEdgeManager.currentTime(); 680 Increment increment = ProtobufUtil.toIncrement(mutation, cells); 681 checkCellSizeLimit(region, increment); 682 spaceQuota.getPolicyEnforcement(region).check(increment); 683 quota.addMutation(increment); 684 long blockBytesScannedBefore = context != null ? context.getBlockBytesScanned() : 0; 685 long nonce = mutation.hasNonce() ? mutation.getNonce() : HConstants.NO_NONCE; 686 Result r = region.increment(increment, nonceGroup, nonce); 687 final MetricsRegionServer metricsRegionServer = server.getMetrics(); 688 if (metricsRegionServer != null) { 689 long blockBytesScanned = 690 context != null ? context.getBlockBytesScanned() - blockBytesScannedBefore : 0; 691 metricsRegionServer.updateIncrement(region, EnvironmentEdgeManager.currentTime() - before, 692 blockBytesScanned); 693 } 694 return r == null ? Result.EMPTY_RESULT : r; 695 } 696 697 /** 698 * Run through the regionMutation <code>rm</code> and per Mutation, do the work, and then when 699 * done, add an instance of a {@link ResultOrException} that corresponds to each Mutation. 700 * @param cellsToReturn Could be null. May be allocated in this method. This is what this method 701 * returns as a 'result'. 702 * @param closeCallBack the callback to be used with multigets 703 * @param context the current RpcCallContext 704 * @return Return the <code>cellScanner</code> passed 705 */ 706 private List<CellScannable> doNonAtomicRegionMutation(final HRegion region, 707 final OperationQuota quota, final RegionAction actions, final CellScanner cellScanner, 708 final RegionActionResult.Builder builder, List<CellScannable> cellsToReturn, long nonceGroup, 709 final RegionScannersCloseCallBack closeCallBack, RpcCallContext context, 710 ActivePolicyEnforcement spaceQuotaEnforcement) { 711 // Gather up CONTIGUOUS Puts and Deletes in this mutations List. Idea is that rather than do 712 // one at a time, we instead pass them in batch. Be aware that the corresponding 713 // ResultOrException instance that matches each Put or Delete is then added down in the 714 // doNonAtomicBatchOp call. We should be staying aligned though the Put and Delete are 715 // deferred/batched 716 List<ClientProtos.Action> mutations = null; 717 long maxQuotaResultSize = Math.min(maxScannerResultSize, quota.getReadAvailable()); 718 IOException sizeIOE = null; 719 ClientProtos.ResultOrException.Builder resultOrExceptionBuilder = 720 ResultOrException.newBuilder(); 721 boolean hasResultOrException = false; 722 for (ClientProtos.Action action : actions.getActionList()) { 723 hasResultOrException = false; 724 resultOrExceptionBuilder.clear(); 725 try { 726 Result r = null; 727 long blockBytesScannedBefore = context != null ? context.getBlockBytesScanned() : 0; 728 if ( 729 context != null && context.isRetryImmediatelySupported() 730 && (context.getResponseCellSize() > maxQuotaResultSize 731 || blockBytesScannedBefore + context.getResponseExceptionSize() > maxQuotaResultSize) 732 ) { 733 734 // We're storing the exception since the exception and reason string won't 735 // change after the response size limit is reached. 736 if (sizeIOE == null) { 737 // We don't need the stack un-winding do don't throw the exception. 738 // Throwing will kill the JVM's JIT. 739 // 740 // Instead just create the exception and then store it. 741 sizeIOE = new MultiActionResultTooLarge("Max size exceeded" + " CellSize: " 742 + context.getResponseCellSize() + " BlockSize: " + blockBytesScannedBefore); 743 744 // Only report the exception once since there's only one request that 745 // caused the exception. Otherwise this number will dominate the exceptions count. 746 rpcServer.getMetrics().exception(sizeIOE); 747 } 748 749 // Now that there's an exception is known to be created 750 // use it for the response. 751 // 752 // This will create a copy in the builder. 753 NameBytesPair pair = ResponseConverter.buildException(sizeIOE); 754 resultOrExceptionBuilder.setException(pair); 755 context.incrementResponseExceptionSize(pair.getSerializedSize()); 756 resultOrExceptionBuilder.setIndex(action.getIndex()); 757 builder.addResultOrException(resultOrExceptionBuilder.build()); 758 skipCellsForMutation(action, cellScanner); 759 continue; 760 } 761 if (action.hasGet()) { 762 long before = EnvironmentEdgeManager.currentTime(); 763 ClientProtos.Get pbGet = action.getGet(); 764 // An asynchbase client, https://github.com/OpenTSDB/asynchbase, starts by trying to do 765 // a get closest before. Throwing the UnknownProtocolException signals it that it needs 766 // to switch and do hbase2 protocol (HBase servers do not tell clients what versions 767 // they are; its a problem for non-native clients like asynchbase. HBASE-20225. 768 if (pbGet.hasClosestRowBefore() && pbGet.getClosestRowBefore()) { 769 throw new UnknownProtocolException("Is this a pre-hbase-1.0.0 or asynchbase client? " 770 + "Client is invoking getClosestRowBefore removed in hbase-2.0.0 replaced by " 771 + "reverse Scan."); 772 } 773 try { 774 Get get = ProtobufUtil.toGet(pbGet); 775 if (context != null) { 776 r = get(get, (region), closeCallBack, context); 777 } else { 778 r = region.get(get); 779 } 780 } finally { 781 final MetricsRegionServer metricsRegionServer = server.getMetrics(); 782 if (metricsRegionServer != null) { 783 long blockBytesScanned = 784 context != null ? context.getBlockBytesScanned() - blockBytesScannedBefore : 0; 785 metricsRegionServer.updateGet(region, EnvironmentEdgeManager.currentTime() - before, 786 blockBytesScanned); 787 } 788 } 789 } else if (action.hasServiceCall()) { 790 hasResultOrException = true; 791 Message result = execServiceOnRegion(region, action.getServiceCall()); 792 ClientProtos.CoprocessorServiceResult.Builder serviceResultBuilder = 793 ClientProtos.CoprocessorServiceResult.newBuilder(); 794 resultOrExceptionBuilder.setServiceResult(serviceResultBuilder 795 .setValue(serviceResultBuilder.getValueBuilder().setName(result.getClass().getName()) 796 // TODO: Copy!!! 797 .setValue(UnsafeByteOperations.unsafeWrap(result.toByteArray())))); 798 } else if (action.hasMutation()) { 799 MutationType type = action.getMutation().getMutateType(); 800 if ( 801 type != MutationType.PUT && type != MutationType.DELETE && mutations != null 802 && !mutations.isEmpty() 803 ) { 804 // Flush out any Puts or Deletes already collected. 805 doNonAtomicBatchOp(builder, region, quota, mutations, cellScanner, 806 spaceQuotaEnforcement); 807 mutations.clear(); 808 } 809 switch (type) { 810 case APPEND: 811 r = append(region, quota, action.getMutation(), cellScanner, nonceGroup, 812 spaceQuotaEnforcement, context); 813 break; 814 case INCREMENT: 815 r = increment(region, quota, action.getMutation(), cellScanner, nonceGroup, 816 spaceQuotaEnforcement, context); 817 break; 818 case PUT: 819 case DELETE: 820 // Collect the individual mutations and apply in a batch 821 if (mutations == null) { 822 mutations = new ArrayList<>(actions.getActionCount()); 823 } 824 mutations.add(action); 825 break; 826 default: 827 throw new DoNotRetryIOException("Unsupported mutate type: " + type.name()); 828 } 829 } else { 830 throw new HBaseIOException("Unexpected Action type"); 831 } 832 if (r != null) { 833 ClientProtos.Result pbResult = null; 834 if (isClientCellBlockSupport(context)) { 835 pbResult = ProtobufUtil.toResultNoData(r); 836 // Hard to guess the size here. Just make a rough guess. 837 if (cellsToReturn == null) { 838 cellsToReturn = new ArrayList<>(); 839 } 840 cellsToReturn.add(r); 841 } else { 842 pbResult = ProtobufUtil.toResult(r); 843 } 844 addSize(context, r); 845 hasResultOrException = true; 846 resultOrExceptionBuilder.setResult(pbResult); 847 } 848 // Could get to here and there was no result and no exception. Presumes we added 849 // a Put or Delete to the collecting Mutations List for adding later. In this 850 // case the corresponding ResultOrException instance for the Put or Delete will be added 851 // down in the doNonAtomicBatchOp method call rather than up here. 852 } catch (IOException ie) { 853 rpcServer.getMetrics().exception(ie); 854 hasResultOrException = true; 855 NameBytesPair pair = ResponseConverter.buildException(ie); 856 resultOrExceptionBuilder.setException(pair); 857 context.incrementResponseExceptionSize(pair.getSerializedSize()); 858 } 859 if (hasResultOrException) { 860 // Propagate index. 861 resultOrExceptionBuilder.setIndex(action.getIndex()); 862 builder.addResultOrException(resultOrExceptionBuilder.build()); 863 } 864 } 865 // Finish up any outstanding mutations 866 if (!CollectionUtils.isEmpty(mutations)) { 867 doNonAtomicBatchOp(builder, region, quota, mutations, cellScanner, spaceQuotaEnforcement); 868 } 869 return cellsToReturn; 870 } 871 872 private void checkCellSizeLimit(final HRegion r, final Mutation m) throws IOException { 873 if (r.maxCellSize > 0) { 874 CellScanner cells = m.cellScanner(); 875 while (cells.advance()) { 876 int size = PrivateCellUtil.estimatedSerializedSizeOf(cells.current()); 877 if (size > r.maxCellSize) { 878 String msg = "Cell[" + cells.current() + "] with size " + size + " exceeds limit of " 879 + r.maxCellSize + " bytes"; 880 LOG.debug(msg); 881 throw new DoNotRetryIOException(msg); 882 } 883 } 884 } 885 } 886 887 private void doAtomicBatchOp(final RegionActionResult.Builder builder, final HRegion region, 888 final OperationQuota quota, final List<ClientProtos.Action> mutations, final CellScanner cells, 889 long nonceGroup, ActivePolicyEnforcement spaceQuotaEnforcement) throws IOException { 890 // Just throw the exception. The exception will be caught and then added to region-level 891 // exception for RegionAction. Leaving the null to action result is ok since the null 892 // result is viewed as failure by hbase client. And the region-lever exception will be used 893 // to replaced the null result. see AsyncRequestFutureImpl#receiveMultiAction and 894 // AsyncBatchRpcRetryingCaller#onComplete for more details. 895 doBatchOp(builder, region, quota, mutations, cells, nonceGroup, spaceQuotaEnforcement, true); 896 } 897 898 private void doNonAtomicBatchOp(final RegionActionResult.Builder builder, final HRegion region, 899 final OperationQuota quota, final List<ClientProtos.Action> mutations, final CellScanner cells, 900 ActivePolicyEnforcement spaceQuotaEnforcement) { 901 try { 902 doBatchOp(builder, region, quota, mutations, cells, HConstants.NO_NONCE, 903 spaceQuotaEnforcement, false); 904 } catch (IOException e) { 905 // Set the exception for each action. The mutations in same RegionAction are group to 906 // different batch and then be processed individually. Hence, we don't set the region-level 907 // exception here for whole RegionAction. 908 for (Action mutation : mutations) { 909 builder.addResultOrException(getResultOrException(e, mutation.getIndex())); 910 } 911 } 912 } 913 914 /** 915 * Execute a list of mutations. 916 */ 917 private void doBatchOp(final RegionActionResult.Builder builder, final HRegion region, 918 final OperationQuota quota, final List<ClientProtos.Action> mutations, final CellScanner cells, 919 long nonceGroup, ActivePolicyEnforcement spaceQuotaEnforcement, boolean atomic) 920 throws IOException { 921 Mutation[] mArray = new Mutation[mutations.size()]; 922 long before = EnvironmentEdgeManager.currentTime(); 923 boolean batchContainsPuts = false, batchContainsDelete = false; 924 try { 925 /** 926 * HBASE-17924 mutationActionMap is a map to map the relation between mutations and actions 927 * since mutation array may have been reoredered.In order to return the right result or 928 * exception to the corresponding actions, We need to know which action is the mutation belong 929 * to. We can't sort ClientProtos.Action array, since they are bonded to cellscanners. 930 */ 931 Map<Mutation, ClientProtos.Action> mutationActionMap = new HashMap<>(); 932 int i = 0; 933 long nonce = HConstants.NO_NONCE; 934 for (ClientProtos.Action action : mutations) { 935 if (action.hasGet()) { 936 throw new DoNotRetryIOException( 937 "Atomic put and/or delete only, not a Get=" + action.getGet()); 938 } 939 MutationProto m = action.getMutation(); 940 Mutation mutation; 941 switch (m.getMutateType()) { 942 case PUT: 943 mutation = ProtobufUtil.toPut(m, cells); 944 batchContainsPuts = true; 945 break; 946 947 case DELETE: 948 mutation = ProtobufUtil.toDelete(m, cells); 949 batchContainsDelete = true; 950 break; 951 952 case INCREMENT: 953 mutation = ProtobufUtil.toIncrement(m, cells); 954 nonce = m.hasNonce() ? m.getNonce() : HConstants.NO_NONCE; 955 break; 956 957 case APPEND: 958 mutation = ProtobufUtil.toAppend(m, cells); 959 nonce = m.hasNonce() ? m.getNonce() : HConstants.NO_NONCE; 960 break; 961 962 default: 963 throw new DoNotRetryIOException("Invalid mutation type : " + m.getMutateType()); 964 } 965 mutationActionMap.put(mutation, action); 966 mArray[i++] = mutation; 967 checkCellSizeLimit(region, mutation); 968 // Check if a space quota disallows this mutation 969 spaceQuotaEnforcement.getPolicyEnforcement(region).check(mutation); 970 quota.addMutation(mutation); 971 } 972 973 if (!region.getRegionInfo().isMetaRegion()) { 974 server.getMemStoreFlusher().reclaimMemStoreMemory(); 975 } 976 977 // HBASE-17924 978 // Sort to improve lock efficiency for non-atomic batch of operations. If atomic 979 // order is preserved as its expected from the client 980 if (!atomic) { 981 Arrays.sort(mArray, (v1, v2) -> Row.COMPARATOR.compare(v1, v2)); 982 } 983 984 OperationStatus[] codes = region.batchMutate(mArray, atomic, nonceGroup, nonce); 985 986 // When atomic is true, it indicates that the mutateRow API or the batch API with 987 // RowMutations is called. In this case, we need to merge the results of the 988 // Increment/Append operations if the mutations include those operations, and set the merged 989 // result to the first element of the ResultOrException list 990 if (atomic) { 991 List<ResultOrException> resultOrExceptions = new ArrayList<>(); 992 List<Result> results = new ArrayList<>(); 993 for (i = 0; i < codes.length; i++) { 994 if (codes[i].getResult() != null) { 995 results.add(codes[i].getResult()); 996 } 997 if (i != 0) { 998 resultOrExceptions 999 .add(getResultOrException(ClientProtos.Result.getDefaultInstance(), i)); 1000 } 1001 } 1002 1003 if (results.isEmpty()) { 1004 builder.addResultOrException( 1005 getResultOrException(ClientProtos.Result.getDefaultInstance(), 0)); 1006 } else { 1007 // Merge the results of the Increment/Append operations 1008 List<Cell> cellList = new ArrayList<>(); 1009 for (Result result : results) { 1010 if (result.rawCells() != null) { 1011 cellList.addAll(Arrays.asList(result.rawCells())); 1012 } 1013 } 1014 Result result = Result.create(cellList); 1015 1016 // Set the merged result of the Increment/Append operations to the first element of the 1017 // ResultOrException list 1018 builder.addResultOrException(getResultOrException(ProtobufUtil.toResult(result), 0)); 1019 } 1020 1021 builder.addAllResultOrException(resultOrExceptions); 1022 return; 1023 } 1024 1025 for (i = 0; i < codes.length; i++) { 1026 Mutation currentMutation = mArray[i]; 1027 ClientProtos.Action currentAction = mutationActionMap.get(currentMutation); 1028 int index = currentAction.hasIndex() ? currentAction.getIndex() : i; 1029 Exception e; 1030 switch (codes[i].getOperationStatusCode()) { 1031 case BAD_FAMILY: 1032 e = new NoSuchColumnFamilyException(codes[i].getExceptionMsg()); 1033 builder.addResultOrException(getResultOrException(e, index)); 1034 break; 1035 1036 case SANITY_CHECK_FAILURE: 1037 e = new FailedSanityCheckException(codes[i].getExceptionMsg()); 1038 builder.addResultOrException(getResultOrException(e, index)); 1039 break; 1040 1041 default: 1042 e = new DoNotRetryIOException(codes[i].getExceptionMsg()); 1043 builder.addResultOrException(getResultOrException(e, index)); 1044 break; 1045 1046 case SUCCESS: 1047 builder.addResultOrException( 1048 getResultOrException(ClientProtos.Result.getDefaultInstance(), index)); 1049 break; 1050 1051 case STORE_TOO_BUSY: 1052 e = new RegionTooBusyException(codes[i].getExceptionMsg()); 1053 builder.addResultOrException(getResultOrException(e, index)); 1054 break; 1055 } 1056 } 1057 } finally { 1058 int processedMutationIndex = 0; 1059 for (Action mutation : mutations) { 1060 // The non-null mArray[i] means the cell scanner has been read. 1061 if (mArray[processedMutationIndex++] == null) { 1062 skipCellsForMutation(mutation, cells); 1063 } 1064 } 1065 updateMutationMetrics(region, before, batchContainsPuts, batchContainsDelete); 1066 } 1067 } 1068 1069 private void updateMutationMetrics(HRegion region, long starttime, boolean batchContainsPuts, 1070 boolean batchContainsDelete) { 1071 final MetricsRegionServer metricsRegionServer = server.getMetrics(); 1072 if (metricsRegionServer != null) { 1073 long after = EnvironmentEdgeManager.currentTime(); 1074 if (batchContainsPuts) { 1075 metricsRegionServer.updatePutBatch(region, after - starttime); 1076 } 1077 if (batchContainsDelete) { 1078 metricsRegionServer.updateDeleteBatch(region, after - starttime); 1079 } 1080 } 1081 } 1082 1083 /** 1084 * Execute a list of Put/Delete mutations. The function returns OperationStatus instead of 1085 * constructing MultiResponse to save a possible loop if caller doesn't need MultiResponse. 1086 * @return an array of OperationStatus which internally contains the OperationStatusCode and the 1087 * exceptionMessage if any 1088 * @deprecated Since 3.0.0, will be removed in 4.0.0. We do not use this method for replaying 1089 * edits for secondary replicas any more, see 1090 * {@link #replicateToReplica(RpcController, ReplicateWALEntryRequest)}. 1091 */ 1092 @Deprecated 1093 private OperationStatus[] doReplayBatchOp(final HRegion region, 1094 final List<MutationReplay> mutations, long replaySeqId) throws IOException { 1095 long before = EnvironmentEdgeManager.currentTime(); 1096 boolean batchContainsPuts = false, batchContainsDelete = false; 1097 try { 1098 for (Iterator<MutationReplay> it = mutations.iterator(); it.hasNext();) { 1099 MutationReplay m = it.next(); 1100 1101 if (m.getType() == MutationType.PUT) { 1102 batchContainsPuts = true; 1103 } else { 1104 batchContainsDelete = true; 1105 } 1106 1107 NavigableMap<byte[], List<Cell>> map = m.mutation.getFamilyCellMap(); 1108 List<Cell> metaCells = map.get(WALEdit.METAFAMILY); 1109 if (metaCells != null && !metaCells.isEmpty()) { 1110 for (Cell metaCell : metaCells) { 1111 CompactionDescriptor compactionDesc = WALEdit.getCompaction(metaCell); 1112 boolean isDefaultReplica = RegionReplicaUtil.isDefaultReplica(region.getRegionInfo()); 1113 HRegion hRegion = region; 1114 if (compactionDesc != null) { 1115 // replay the compaction. Remove the files from stores only if we are the primary 1116 // region replica (thus own the files) 1117 hRegion.replayWALCompactionMarker(compactionDesc, !isDefaultReplica, isDefaultReplica, 1118 replaySeqId); 1119 continue; 1120 } 1121 FlushDescriptor flushDesc = WALEdit.getFlushDescriptor(metaCell); 1122 if (flushDesc != null && !isDefaultReplica) { 1123 hRegion.replayWALFlushMarker(flushDesc, replaySeqId); 1124 continue; 1125 } 1126 RegionEventDescriptor regionEvent = WALEdit.getRegionEventDescriptor(metaCell); 1127 if (regionEvent != null && !isDefaultReplica) { 1128 hRegion.replayWALRegionEventMarker(regionEvent); 1129 continue; 1130 } 1131 BulkLoadDescriptor bulkLoadEvent = WALEdit.getBulkLoadDescriptor(metaCell); 1132 if (bulkLoadEvent != null) { 1133 hRegion.replayWALBulkLoadEventMarker(bulkLoadEvent); 1134 continue; 1135 } 1136 } 1137 it.remove(); 1138 } 1139 } 1140 requestCount.increment(); 1141 if (!region.getRegionInfo().isMetaRegion()) { 1142 server.getMemStoreFlusher().reclaimMemStoreMemory(); 1143 } 1144 return region.batchReplay(mutations.toArray(new MutationReplay[mutations.size()]), 1145 replaySeqId); 1146 } finally { 1147 updateMutationMetrics(region, before, batchContainsPuts, batchContainsDelete); 1148 } 1149 } 1150 1151 private void closeAllScanners() { 1152 // Close any outstanding scanners. Means they'll get an UnknownScanner 1153 // exception next time they come in. 1154 for (Map.Entry<String, RegionScannerHolder> e : scanners.entrySet()) { 1155 try { 1156 e.getValue().s.close(); 1157 } catch (IOException ioe) { 1158 LOG.warn("Closing scanner " + e.getKey(), ioe); 1159 } 1160 } 1161 } 1162 1163 // Directly invoked only for testing 1164 public RSRpcServices(final HRegionServer rs) throws IOException { 1165 super(rs, rs.getProcessName()); 1166 final Configuration conf = rs.getConfiguration(); 1167 setReloadableGuardrails(conf); 1168 scannerLeaseTimeoutPeriod = conf.getInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, 1169 HConstants.DEFAULT_HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD); 1170 rpcTimeout = 1171 conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY, HConstants.DEFAULT_HBASE_RPC_TIMEOUT); 1172 minimumScanTimeLimitDelta = conf.getLong(REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA, 1173 DEFAULT_REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA); 1174 rpcServer.setNamedQueueRecorder(rs.getNamedQueueRecorder()); 1175 closedScanners = CacheBuilder.newBuilder() 1176 .expireAfterAccess(scannerLeaseTimeoutPeriod, TimeUnit.MILLISECONDS).build(); 1177 } 1178 1179 @Override 1180 protected boolean defaultReservoirEnabled() { 1181 return true; 1182 } 1183 1184 @Override 1185 protected ServerType getDNSServerType() { 1186 return DNS.ServerType.REGIONSERVER; 1187 } 1188 1189 @Override 1190 protected String getHostname(Configuration conf, String defaultHostname) { 1191 return conf.get("hbase.regionserver.ipc.address", defaultHostname); 1192 } 1193 1194 @Override 1195 protected String getPortConfigName() { 1196 return HConstants.REGIONSERVER_PORT; 1197 } 1198 1199 @Override 1200 protected int getDefaultPort() { 1201 return HConstants.DEFAULT_REGIONSERVER_PORT; 1202 } 1203 1204 @Override 1205 protected Class<?> getRpcSchedulerFactoryClass(Configuration conf) { 1206 return conf.getClass(REGION_SERVER_RPC_SCHEDULER_FACTORY_CLASS, 1207 SimpleRpcSchedulerFactory.class); 1208 } 1209 1210 protected RpcServerInterface createRpcServer(final Server server, 1211 final RpcSchedulerFactory rpcSchedulerFactory, final InetSocketAddress bindAddress, 1212 final String name) throws IOException { 1213 final Configuration conf = server.getConfiguration(); 1214 boolean reservoirEnabled = conf.getBoolean(ByteBuffAllocator.ALLOCATOR_POOL_ENABLED_KEY, true); 1215 try { 1216 return RpcServerFactory.createRpcServer(server, name, getServices(), bindAddress, // use final 1217 // bindAddress 1218 // for this 1219 // server. 1220 conf, rpcSchedulerFactory.create(conf, this, server), reservoirEnabled); 1221 } catch (BindException be) { 1222 throw new IOException(be.getMessage() + ". To switch ports use the '" 1223 + HConstants.REGIONSERVER_PORT + "' configuration property.", 1224 be.getCause() != null ? be.getCause() : be); 1225 } 1226 } 1227 1228 protected Class<?> getRpcSchedulerFactoryClass() { 1229 final Configuration conf = server.getConfiguration(); 1230 return conf.getClass(REGION_SERVER_RPC_SCHEDULER_FACTORY_CLASS, 1231 SimpleRpcSchedulerFactory.class); 1232 } 1233 1234 protected PriorityFunction createPriority() { 1235 return new RSAnnotationReadingPriorityFunction(this); 1236 } 1237 1238 public int getScannersCount() { 1239 return scanners.size(); 1240 } 1241 1242 /** Returns The outstanding RegionScanner for <code>scannerId</code> or null if none found. */ 1243 RegionScanner getScanner(long scannerId) { 1244 RegionScannerHolder rsh = getRegionScannerHolder(scannerId); 1245 return rsh == null ? null : rsh.s; 1246 } 1247 1248 /** Returns The associated RegionScannerHolder for <code>scannerId</code> or null. */ 1249 private RegionScannerHolder getRegionScannerHolder(long scannerId) { 1250 return scanners.get(toScannerName(scannerId)); 1251 } 1252 1253 public String getScanDetailsWithId(long scannerId) { 1254 RegionScanner scanner = getScanner(scannerId); 1255 if (scanner == null) { 1256 return null; 1257 } 1258 StringBuilder builder = new StringBuilder(); 1259 builder.append("table: ").append(scanner.getRegionInfo().getTable().getNameAsString()); 1260 builder.append(" region: ").append(scanner.getRegionInfo().getRegionNameAsString()); 1261 builder.append(" operation_id: ").append(scanner.getOperationId()); 1262 return builder.toString(); 1263 } 1264 1265 public String getScanDetailsWithRequest(ScanRequest request) { 1266 try { 1267 if (!request.hasRegion()) { 1268 return null; 1269 } 1270 Region region = getRegion(request.getRegion()); 1271 StringBuilder builder = new StringBuilder(); 1272 builder.append("table: ").append(region.getRegionInfo().getTable().getNameAsString()); 1273 builder.append(" region: ").append(region.getRegionInfo().getRegionNameAsString()); 1274 for (NameBytesPair pair : request.getScan().getAttributeList()) { 1275 if (OperationWithAttributes.ID_ATRIBUTE.equals(pair.getName())) { 1276 builder.append(" operation_id: ").append(Bytes.toString(pair.getValue().toByteArray())); 1277 break; 1278 } 1279 } 1280 return builder.toString(); 1281 } catch (IOException ignored) { 1282 return null; 1283 } 1284 } 1285 1286 /** 1287 * Get the vtime associated with the scanner. Currently the vtime is the number of "next" calls. 1288 */ 1289 long getScannerVirtualTime(long scannerId) { 1290 RegionScannerHolder rsh = getRegionScannerHolder(scannerId); 1291 return rsh == null ? 0L : rsh.getNextCallSeq(); 1292 } 1293 1294 /** 1295 * Method to account for the size of retained cells. 1296 * @param context rpc call context 1297 * @param r result to add size. 1298 * @return an object that represents the last referenced block from this response. 1299 */ 1300 void addSize(RpcCallContext context, Result r) { 1301 if (context != null && r != null && !r.isEmpty()) { 1302 for (Cell c : r.rawCells()) { 1303 context.incrementResponseCellSize(PrivateCellUtil.estimatedSerializedSizeOf(c)); 1304 } 1305 } 1306 } 1307 1308 /** Returns Remote client's ip and port else null if can't be determined. */ 1309 @RestrictedApi(explanation = "Should only be called in TestRSRpcServices and RSRpcServices", 1310 link = "", allowedOnPath = ".*(TestRSRpcServices|RSRpcServices).java") 1311 static String getRemoteClientIpAndPort() { 1312 RpcCall rpcCall = RpcServer.getCurrentCall().orElse(null); 1313 if (rpcCall == null) { 1314 return HConstants.EMPTY_STRING; 1315 } 1316 InetAddress address = rpcCall.getRemoteAddress(); 1317 if (address == null) { 1318 return HConstants.EMPTY_STRING; 1319 } 1320 // Be careful here with InetAddress. Do InetAddress#getHostAddress. It will not do a name 1321 // resolution. Just use the IP. It is generally a smaller amount of info to keep around while 1322 // scanning than a hostname anyways. 1323 return Address.fromParts(address.getHostAddress(), rpcCall.getRemotePort()).toString(); 1324 } 1325 1326 /** Returns Remote client's username. */ 1327 @RestrictedApi(explanation = "Should only be called in TestRSRpcServices and RSRpcServices", 1328 link = "", allowedOnPath = ".*(TestRSRpcServices|RSRpcServices).java") 1329 static String getUserName() { 1330 RpcCall rpcCall = RpcServer.getCurrentCall().orElse(null); 1331 if (rpcCall == null) { 1332 return HConstants.EMPTY_STRING; 1333 } 1334 return rpcCall.getRequestUserName().orElse(HConstants.EMPTY_STRING); 1335 } 1336 1337 private RegionScannerHolder addScanner(String scannerName, RegionScanner s, Shipper shipper, 1338 HRegion r, boolean needCursor, boolean fullRegionScan) throws LeaseStillHeldException { 1339 Lease lease = server.getLeaseManager().createLease(scannerName, this.scannerLeaseTimeoutPeriod, 1340 new ScannerListener(scannerName)); 1341 RpcCallback shippedCallback = new RegionScannerShippedCallBack(scannerName, shipper, lease); 1342 RpcCallback closeCallback = 1343 s instanceof RpcCallback ? (RpcCallback) s : new RegionScannerCloseCallBack(s); 1344 RegionScannerHolder rsh = new RegionScannerHolder(s, r, closeCallback, shippedCallback, 1345 needCursor, fullRegionScan, getRemoteClientIpAndPort(), getUserName()); 1346 RegionScannerHolder existing = scanners.putIfAbsent(scannerName, rsh); 1347 assert existing == null : "scannerId must be unique within regionserver's whole lifecycle! " 1348 + scannerName + ", " + existing; 1349 return rsh; 1350 } 1351 1352 private boolean isFullRegionScan(Scan scan, HRegion region) { 1353 // If the scan start row equals or less than the start key of the region 1354 // and stop row greater than equals end key (if stop row present) 1355 // or if the stop row is empty 1356 // account this as a full region scan 1357 if ( 1358 Bytes.compareTo(scan.getStartRow(), region.getRegionInfo().getStartKey()) <= 0 1359 && (Bytes.compareTo(scan.getStopRow(), region.getRegionInfo().getEndKey()) >= 0 1360 && !Bytes.equals(region.getRegionInfo().getEndKey(), HConstants.EMPTY_END_ROW) 1361 || Bytes.equals(scan.getStopRow(), HConstants.EMPTY_END_ROW)) 1362 ) { 1363 return true; 1364 } 1365 return false; 1366 } 1367 1368 /** 1369 * Find the HRegion based on a region specifier 1370 * @param regionSpecifier the region specifier 1371 * @return the corresponding region 1372 * @throws IOException if the specifier is not null, but failed to find the region 1373 */ 1374 public HRegion getRegion(final RegionSpecifier regionSpecifier) throws IOException { 1375 return server.getRegion(regionSpecifier.getValue().toByteArray()); 1376 } 1377 1378 /** 1379 * Find the List of HRegions based on a list of region specifiers 1380 * @param regionSpecifiers the list of region specifiers 1381 * @return the corresponding list of regions 1382 * @throws IOException if any of the specifiers is not null, but failed to find the region 1383 */ 1384 private List<HRegion> getRegions(final List<RegionSpecifier> regionSpecifiers, 1385 final CacheEvictionStatsBuilder stats) { 1386 List<HRegion> regions = Lists.newArrayListWithCapacity(regionSpecifiers.size()); 1387 for (RegionSpecifier regionSpecifier : regionSpecifiers) { 1388 try { 1389 regions.add(server.getRegion(regionSpecifier.getValue().toByteArray())); 1390 } catch (NotServingRegionException e) { 1391 stats.addException(regionSpecifier.getValue().toByteArray(), e); 1392 } 1393 } 1394 return regions; 1395 } 1396 1397 PriorityFunction getPriority() { 1398 return priority; 1399 } 1400 1401 private RegionServerRpcQuotaManager getRpcQuotaManager() { 1402 return server.getRegionServerRpcQuotaManager(); 1403 } 1404 1405 private RegionServerSpaceQuotaManager getSpaceQuotaManager() { 1406 return server.getRegionServerSpaceQuotaManager(); 1407 } 1408 1409 void start(ZKWatcher zkWatcher) { 1410 this.scannerIdGenerator = new ScannerIdGenerator(this.server.getServerName()); 1411 internalStart(zkWatcher); 1412 } 1413 1414 void stop() { 1415 closeAllScanners(); 1416 internalStop(); 1417 } 1418 1419 /** 1420 * Called to verify that this server is up and running. 1421 */ 1422 // TODO : Rename this and HMaster#checkInitialized to isRunning() (or a better name). 1423 protected void checkOpen() throws IOException { 1424 if (server.isAborted()) { 1425 throw new RegionServerAbortedException("Server " + server.getServerName() + " aborting"); 1426 } 1427 if (server.isStopped()) { 1428 throw new RegionServerStoppedException("Server " + server.getServerName() + " stopping"); 1429 } 1430 if (!server.isDataFileSystemOk()) { 1431 throw new RegionServerStoppedException("File system not available"); 1432 } 1433 if (!server.isOnline()) { 1434 throw new ServerNotRunningYetException( 1435 "Server " + server.getServerName() + " is not running yet"); 1436 } 1437 } 1438 1439 /** 1440 * By default, put up an Admin and a Client Service. Set booleans 1441 * <code>hbase.regionserver.admin.executorService</code> and 1442 * <code>hbase.regionserver.client.executorService</code> if you want to enable/disable services. 1443 * Default is that both are enabled. 1444 * @return immutable list of blocking services and the security info classes that this server 1445 * supports 1446 */ 1447 protected List<BlockingServiceAndInterface> getServices() { 1448 boolean admin = getConfiguration().getBoolean(REGIONSERVER_ADMIN_SERVICE_CONFIG, true); 1449 boolean client = getConfiguration().getBoolean(REGIONSERVER_CLIENT_SERVICE_CONFIG, true); 1450 boolean clientMeta = 1451 getConfiguration().getBoolean(REGIONSERVER_CLIENT_META_SERVICE_CONFIG, true); 1452 List<BlockingServiceAndInterface> bssi = new ArrayList<>(); 1453 if (client) { 1454 bssi.add(new BlockingServiceAndInterface(ClientService.newReflectiveBlockingService(this), 1455 ClientService.BlockingInterface.class)); 1456 } 1457 if (admin) { 1458 bssi.add(new BlockingServiceAndInterface(AdminService.newReflectiveBlockingService(this), 1459 AdminService.BlockingInterface.class)); 1460 } 1461 if (clientMeta) { 1462 bssi.add(new BlockingServiceAndInterface(ClientMetaService.newReflectiveBlockingService(this), 1463 ClientMetaService.BlockingInterface.class)); 1464 } 1465 return new ImmutableList.Builder<BlockingServiceAndInterface>().addAll(bssi).build(); 1466 } 1467 1468 /** 1469 * Close a region on the region server. 1470 * @param controller the RPC controller 1471 * @param request the request 1472 */ 1473 @Override 1474 @QosPriority(priority = HConstants.ADMIN_QOS) 1475 public CloseRegionResponse closeRegion(final RpcController controller, 1476 final CloseRegionRequest request) throws ServiceException { 1477 final ServerName sn = (request.hasDestinationServer() 1478 ? ProtobufUtil.toServerName(request.getDestinationServer()) 1479 : null); 1480 1481 try { 1482 checkOpen(); 1483 throwOnWrongStartCode(request); 1484 final String encodedRegionName = ProtobufUtil.getRegionEncodedName(request.getRegion()); 1485 1486 requestCount.increment(); 1487 if (sn == null) { 1488 LOG.info("Close " + encodedRegionName + " without moving"); 1489 } else { 1490 LOG.info("Close " + encodedRegionName + ", moving to " + sn); 1491 } 1492 boolean closed = server.closeRegion(encodedRegionName, false, sn); 1493 CloseRegionResponse.Builder builder = CloseRegionResponse.newBuilder().setClosed(closed); 1494 return builder.build(); 1495 } catch (IOException ie) { 1496 throw new ServiceException(ie); 1497 } 1498 } 1499 1500 /** 1501 * Compact a region on the region server. 1502 * @param controller the RPC controller 1503 * @param request the request 1504 */ 1505 @Override 1506 @QosPriority(priority = HConstants.ADMIN_QOS) 1507 public CompactRegionResponse compactRegion(final RpcController controller, 1508 final CompactRegionRequest request) throws ServiceException { 1509 try { 1510 checkOpen(); 1511 requestCount.increment(); 1512 HRegion region = getRegion(request.getRegion()); 1513 // Quota support is enabled, the requesting user is not system/super user 1514 // and a quota policy is enforced that disables compactions. 1515 if ( 1516 QuotaUtil.isQuotaEnabled(getConfiguration()) 1517 && !Superusers.isSuperUser(RpcServer.getRequestUser().orElse(null)) 1518 && this.server.getRegionServerSpaceQuotaManager() 1519 .areCompactionsDisabled(region.getTableDescriptor().getTableName()) 1520 ) { 1521 throw new DoNotRetryIOException( 1522 "Compactions on this region are " + "disabled due to a space quota violation."); 1523 } 1524 region.startRegionOperation(Operation.COMPACT_REGION); 1525 LOG.info("Compacting " + region.getRegionInfo().getRegionNameAsString()); 1526 boolean major = request.hasMajor() && request.getMajor(); 1527 if (request.hasFamily()) { 1528 byte[] family = request.getFamily().toByteArray(); 1529 String log = "User-triggered " + (major ? "major " : "") + "compaction for region " 1530 + region.getRegionInfo().getRegionNameAsString() + " and family " 1531 + Bytes.toString(family); 1532 LOG.trace(log); 1533 region.requestCompaction(family, log, Store.PRIORITY_USER, major, 1534 CompactionLifeCycleTracker.DUMMY); 1535 } else { 1536 String log = "User-triggered " + (major ? "major " : "") + "compaction for region " 1537 + region.getRegionInfo().getRegionNameAsString(); 1538 LOG.trace(log); 1539 region.requestCompaction(log, Store.PRIORITY_USER, major, CompactionLifeCycleTracker.DUMMY); 1540 } 1541 return CompactRegionResponse.newBuilder().build(); 1542 } catch (IOException ie) { 1543 throw new ServiceException(ie); 1544 } 1545 } 1546 1547 @Override 1548 public CompactionSwitchResponse compactionSwitch(RpcController controller, 1549 CompactionSwitchRequest request) throws ServiceException { 1550 rpcPreCheck("compactionSwitch"); 1551 final CompactSplit compactSplitThread = server.getCompactSplitThread(); 1552 requestCount.increment(); 1553 boolean prevState = compactSplitThread.isCompactionsEnabled(); 1554 CompactionSwitchResponse response = 1555 CompactionSwitchResponse.newBuilder().setPrevState(prevState).build(); 1556 if (prevState == request.getEnabled()) { 1557 // passed in requested state is same as current state. No action required 1558 return response; 1559 } 1560 compactSplitThread.switchCompaction(request.getEnabled()); 1561 return response; 1562 } 1563 1564 /** 1565 * Flush a region on the region server. 1566 * @param controller the RPC controller 1567 * @param request the request 1568 */ 1569 @Override 1570 @QosPriority(priority = HConstants.ADMIN_QOS) 1571 public FlushRegionResponse flushRegion(final RpcController controller, 1572 final FlushRegionRequest request) throws ServiceException { 1573 try { 1574 checkOpen(); 1575 requestCount.increment(); 1576 HRegion region = getRegion(request.getRegion()); 1577 LOG.info("Flushing " + region.getRegionInfo().getRegionNameAsString()); 1578 boolean shouldFlush = true; 1579 if (request.hasIfOlderThanTs()) { 1580 shouldFlush = region.getEarliestFlushTimeForAllStores() < request.getIfOlderThanTs(); 1581 } 1582 FlushRegionResponse.Builder builder = FlushRegionResponse.newBuilder(); 1583 if (shouldFlush) { 1584 boolean writeFlushWalMarker = 1585 request.hasWriteFlushWalMarker() ? request.getWriteFlushWalMarker() : false; 1586 // Go behind the curtain so we can manage writing of the flush WAL marker 1587 HRegion.FlushResultImpl flushResult = null; 1588 if (request.hasFamily()) { 1589 List families = new ArrayList(); 1590 families.add(request.getFamily().toByteArray()); 1591 flushResult = 1592 region.flushcache(families, writeFlushWalMarker, FlushLifeCycleTracker.DUMMY); 1593 } else { 1594 flushResult = region.flushcache(true, writeFlushWalMarker, FlushLifeCycleTracker.DUMMY); 1595 } 1596 boolean compactionNeeded = flushResult.isCompactionNeeded(); 1597 if (compactionNeeded) { 1598 server.getCompactSplitThread().requestSystemCompaction(region, 1599 "Compaction through user triggered flush"); 1600 } 1601 builder.setFlushed(flushResult.isFlushSucceeded()); 1602 builder.setWroteFlushWalMarker(flushResult.wroteFlushWalMarker); 1603 } 1604 builder.setLastFlushTime(region.getEarliestFlushTimeForAllStores()); 1605 return builder.build(); 1606 } catch (DroppedSnapshotException ex) { 1607 // Cache flush can fail in a few places. If it fails in a critical 1608 // section, we get a DroppedSnapshotException and a replay of wal 1609 // is required. Currently the only way to do this is a restart of 1610 // the server. 1611 server.abort("Replay of WAL required. Forcing server shutdown", ex); 1612 throw new ServiceException(ex); 1613 } catch (IOException ie) { 1614 throw new ServiceException(ie); 1615 } 1616 } 1617 1618 @Override 1619 @QosPriority(priority = HConstants.ADMIN_QOS) 1620 public GetOnlineRegionResponse getOnlineRegion(final RpcController controller, 1621 final GetOnlineRegionRequest request) throws ServiceException { 1622 try { 1623 checkOpen(); 1624 requestCount.increment(); 1625 Map<String, HRegion> onlineRegions = server.getOnlineRegions(); 1626 List<RegionInfo> list = new ArrayList<>(onlineRegions.size()); 1627 for (HRegion region : onlineRegions.values()) { 1628 list.add(region.getRegionInfo()); 1629 } 1630 list.sort(RegionInfo.COMPARATOR); 1631 return ResponseConverter.buildGetOnlineRegionResponse(list); 1632 } catch (IOException ie) { 1633 throw new ServiceException(ie); 1634 } 1635 } 1636 1637 // Master implementation of this Admin Service differs given it is not 1638 // able to supply detail only known to RegionServer. See note on 1639 // MasterRpcServers#getRegionInfo. 1640 @Override 1641 @QosPriority(priority = HConstants.ADMIN_QOS) 1642 public GetRegionInfoResponse getRegionInfo(final RpcController controller, 1643 final GetRegionInfoRequest request) throws ServiceException { 1644 try { 1645 checkOpen(); 1646 requestCount.increment(); 1647 HRegion region = getRegion(request.getRegion()); 1648 RegionInfo info = region.getRegionInfo(); 1649 byte[] bestSplitRow; 1650 if (request.hasBestSplitRow() && request.getBestSplitRow()) { 1651 bestSplitRow = region.checkSplit(true).orElse(null); 1652 // when all table data are in memstore, bestSplitRow = null 1653 // try to flush region first 1654 if (bestSplitRow == null) { 1655 region.flush(true); 1656 bestSplitRow = region.checkSplit(true).orElse(null); 1657 } 1658 } else { 1659 bestSplitRow = null; 1660 } 1661 GetRegionInfoResponse.Builder builder = GetRegionInfoResponse.newBuilder(); 1662 builder.setRegionInfo(ProtobufUtil.toRegionInfo(info)); 1663 if (request.hasCompactionState() && request.getCompactionState()) { 1664 builder.setCompactionState(ProtobufUtil.createCompactionState(region.getCompactionState())); 1665 } 1666 builder.setSplittable(region.isSplittable()); 1667 builder.setMergeable(region.isMergeable()); 1668 if (request.hasBestSplitRow() && request.getBestSplitRow() && bestSplitRow != null) { 1669 builder.setBestSplitRow(UnsafeByteOperations.unsafeWrap(bestSplitRow)); 1670 } 1671 return builder.build(); 1672 } catch (IOException ie) { 1673 throw new ServiceException(ie); 1674 } 1675 } 1676 1677 @Override 1678 @QosPriority(priority = HConstants.ADMIN_QOS) 1679 public GetRegionLoadResponse getRegionLoad(RpcController controller, GetRegionLoadRequest request) 1680 throws ServiceException { 1681 1682 List<HRegion> regions; 1683 if (request.hasTableName()) { 1684 TableName tableName = ProtobufUtil.toTableName(request.getTableName()); 1685 regions = server.getRegions(tableName); 1686 } else { 1687 regions = server.getRegions(); 1688 } 1689 List<RegionLoad> rLoads = new ArrayList<>(regions.size()); 1690 RegionLoad.Builder regionLoadBuilder = ClusterStatusProtos.RegionLoad.newBuilder(); 1691 RegionSpecifier.Builder regionSpecifier = RegionSpecifier.newBuilder(); 1692 1693 try { 1694 for (HRegion region : regions) { 1695 rLoads.add(server.createRegionLoad(region, regionLoadBuilder, regionSpecifier)); 1696 } 1697 } catch (IOException e) { 1698 throw new ServiceException(e); 1699 } 1700 GetRegionLoadResponse.Builder builder = GetRegionLoadResponse.newBuilder(); 1701 builder.addAllRegionLoads(rLoads); 1702 return builder.build(); 1703 } 1704 1705 @Override 1706 @QosPriority(priority = HConstants.ADMIN_QOS) 1707 public ClearCompactionQueuesResponse clearCompactionQueues(RpcController controller, 1708 ClearCompactionQueuesRequest request) throws ServiceException { 1709 LOG.debug("Client=" + RpcServer.getRequestUserName().orElse(null) + "/" 1710 + RpcServer.getRemoteAddress().orElse(null) + " clear compactions queue"); 1711 ClearCompactionQueuesResponse.Builder respBuilder = ClearCompactionQueuesResponse.newBuilder(); 1712 requestCount.increment(); 1713 if (clearCompactionQueues.compareAndSet(false, true)) { 1714 final CompactSplit compactSplitThread = server.getCompactSplitThread(); 1715 try { 1716 checkOpen(); 1717 server.getRegionServerCoprocessorHost().preClearCompactionQueues(); 1718 for (String queueName : request.getQueueNameList()) { 1719 LOG.debug("clear " + queueName + " compaction queue"); 1720 switch (queueName) { 1721 case "long": 1722 compactSplitThread.clearLongCompactionsQueue(); 1723 break; 1724 case "short": 1725 compactSplitThread.clearShortCompactionsQueue(); 1726 break; 1727 default: 1728 LOG.warn("Unknown queue name " + queueName); 1729 throw new IOException("Unknown queue name " + queueName); 1730 } 1731 } 1732 server.getRegionServerCoprocessorHost().postClearCompactionQueues(); 1733 } catch (IOException ie) { 1734 throw new ServiceException(ie); 1735 } finally { 1736 clearCompactionQueues.set(false); 1737 } 1738 } else { 1739 LOG.warn("Clear compactions queue is executing by other admin."); 1740 } 1741 return respBuilder.build(); 1742 } 1743 1744 /** 1745 * Get some information of the region server. 1746 * @param controller the RPC controller 1747 * @param request the request 1748 */ 1749 @Override 1750 @QosPriority(priority = HConstants.ADMIN_QOS) 1751 public GetServerInfoResponse getServerInfo(final RpcController controller, 1752 final GetServerInfoRequest request) throws ServiceException { 1753 try { 1754 checkOpen(); 1755 } catch (IOException ie) { 1756 throw new ServiceException(ie); 1757 } 1758 requestCount.increment(); 1759 int infoPort = server.getInfoServer() != null ? server.getInfoServer().getPort() : -1; 1760 return ResponseConverter.buildGetServerInfoResponse(server.getServerName(), infoPort); 1761 } 1762 1763 @Override 1764 @QosPriority(priority = HConstants.ADMIN_QOS) 1765 public GetStoreFileResponse getStoreFile(final RpcController controller, 1766 final GetStoreFileRequest request) throws ServiceException { 1767 try { 1768 checkOpen(); 1769 HRegion region = getRegion(request.getRegion()); 1770 requestCount.increment(); 1771 Set<byte[]> columnFamilies; 1772 if (request.getFamilyCount() == 0) { 1773 columnFamilies = region.getTableDescriptor().getColumnFamilyNames(); 1774 } else { 1775 columnFamilies = new TreeSet<>(Bytes.BYTES_RAWCOMPARATOR); 1776 for (ByteString cf : request.getFamilyList()) { 1777 columnFamilies.add(cf.toByteArray()); 1778 } 1779 } 1780 int nCF = columnFamilies.size(); 1781 List<String> fileList = region.getStoreFileList(columnFamilies.toArray(new byte[nCF][])); 1782 GetStoreFileResponse.Builder builder = GetStoreFileResponse.newBuilder(); 1783 builder.addAllStoreFile(fileList); 1784 return builder.build(); 1785 } catch (IOException ie) { 1786 throw new ServiceException(ie); 1787 } 1788 } 1789 1790 private void throwOnWrongStartCode(OpenRegionRequest request) throws ServiceException { 1791 if (!request.hasServerStartCode()) { 1792 LOG.warn("OpenRegionRequest for {} does not have a start code", request.getOpenInfoList()); 1793 return; 1794 } 1795 throwOnWrongStartCode(request.getServerStartCode()); 1796 } 1797 1798 private void throwOnWrongStartCode(CloseRegionRequest request) throws ServiceException { 1799 if (!request.hasServerStartCode()) { 1800 LOG.warn("CloseRegionRequest for {} does not have a start code", request.getRegion()); 1801 return; 1802 } 1803 throwOnWrongStartCode(request.getServerStartCode()); 1804 } 1805 1806 private void throwOnWrongStartCode(long serverStartCode) throws ServiceException { 1807 // check that we are the same server that this RPC is intended for. 1808 if (server.getServerName().getStartcode() != serverStartCode) { 1809 throw new ServiceException(new DoNotRetryIOException( 1810 "This RPC was intended for a " + "different server with startCode: " + serverStartCode 1811 + ", this server is: " + server.getServerName())); 1812 } 1813 } 1814 1815 private void throwOnWrongStartCode(ExecuteProceduresRequest req) throws ServiceException { 1816 if (req.getOpenRegionCount() > 0) { 1817 for (OpenRegionRequest openReq : req.getOpenRegionList()) { 1818 throwOnWrongStartCode(openReq); 1819 } 1820 } 1821 if (req.getCloseRegionCount() > 0) { 1822 for (CloseRegionRequest closeReq : req.getCloseRegionList()) { 1823 throwOnWrongStartCode(closeReq); 1824 } 1825 } 1826 } 1827 1828 /** 1829 * Open asynchronously a region or a set of regions on the region server. The opening is 1830 * coordinated by ZooKeeper, and this method requires the znode to be created before being called. 1831 * As a consequence, this method should be called only from the master. 1832 * <p> 1833 * Different manages states for the region are: 1834 * </p> 1835 * <ul> 1836 * <li>region not opened: the region opening will start asynchronously.</li> 1837 * <li>a close is already in progress: this is considered as an error.</li> 1838 * <li>an open is already in progress: this new open request will be ignored. This is important 1839 * because the Master can do multiple requests if it crashes.</li> 1840 * <li>the region is already opened: this new open request will be ignored.</li> 1841 * </ul> 1842 * <p> 1843 * Bulk assign: If there are more than 1 region to open, it will be considered as a bulk assign. 1844 * For a single region opening, errors are sent through a ServiceException. For bulk assign, 1845 * errors are put in the response as FAILED_OPENING. 1846 * </p> 1847 * @param controller the RPC controller 1848 * @param request the request 1849 */ 1850 @Override 1851 @QosPriority(priority = HConstants.ADMIN_QOS) 1852 public OpenRegionResponse openRegion(final RpcController controller, 1853 final OpenRegionRequest request) throws ServiceException { 1854 requestCount.increment(); 1855 throwOnWrongStartCode(request); 1856 1857 OpenRegionResponse.Builder builder = OpenRegionResponse.newBuilder(); 1858 final int regionCount = request.getOpenInfoCount(); 1859 final Map<TableName, TableDescriptor> htds = new HashMap<>(regionCount); 1860 final boolean isBulkAssign = regionCount > 1; 1861 try { 1862 checkOpen(); 1863 } catch (IOException ie) { 1864 TableName tableName = null; 1865 if (regionCount == 1) { 1866 org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionInfo ri = 1867 request.getOpenInfo(0).getRegion(); 1868 if (ri != null) { 1869 tableName = ProtobufUtil.toTableName(ri.getTableName()); 1870 } 1871 } 1872 if (!TableName.META_TABLE_NAME.equals(tableName)) { 1873 throw new ServiceException(ie); 1874 } 1875 // We are assigning meta, wait a little for regionserver to finish initialization. 1876 // Default to quarter of RPC timeout 1877 int timeout = server.getConfiguration().getInt(HConstants.HBASE_RPC_TIMEOUT_KEY, 1878 HConstants.DEFAULT_HBASE_RPC_TIMEOUT) >> 2; 1879 long endTime = EnvironmentEdgeManager.currentTime() + timeout; 1880 synchronized (server.online) { 1881 try { 1882 while ( 1883 EnvironmentEdgeManager.currentTime() <= endTime && !server.isStopped() 1884 && !server.isOnline() 1885 ) { 1886 server.online.wait(server.getMsgInterval()); 1887 } 1888 checkOpen(); 1889 } catch (InterruptedException t) { 1890 Thread.currentThread().interrupt(); 1891 throw new ServiceException(t); 1892 } catch (IOException e) { 1893 throw new ServiceException(e); 1894 } 1895 } 1896 } 1897 1898 long masterSystemTime = request.hasMasterSystemTime() ? request.getMasterSystemTime() : -1; 1899 1900 for (RegionOpenInfo regionOpenInfo : request.getOpenInfoList()) { 1901 final RegionInfo region = ProtobufUtil.toRegionInfo(regionOpenInfo.getRegion()); 1902 TableDescriptor htd; 1903 try { 1904 String encodedName = region.getEncodedName(); 1905 byte[] encodedNameBytes = region.getEncodedNameAsBytes(); 1906 final HRegion onlineRegion = server.getRegion(encodedName); 1907 if (onlineRegion != null) { 1908 // The region is already online. This should not happen any more. 1909 String error = "Received OPEN for the region:" + region.getRegionNameAsString() 1910 + ", which is already online"; 1911 LOG.warn(error); 1912 // server.abort(error); 1913 // throw new IOException(error); 1914 builder.addOpeningState(RegionOpeningState.OPENED); 1915 continue; 1916 } 1917 LOG.info("Open " + region.getRegionNameAsString()); 1918 1919 final Boolean previous = 1920 server.getRegionsInTransitionInRS().putIfAbsent(encodedNameBytes, Boolean.TRUE); 1921 1922 if (Boolean.FALSE.equals(previous)) { 1923 if (server.getRegion(encodedName) != null) { 1924 // There is a close in progress. This should not happen any more. 1925 String error = "Received OPEN for the region:" + region.getRegionNameAsString() 1926 + ", which we are already trying to CLOSE"; 1927 server.abort(error); 1928 throw new IOException(error); 1929 } 1930 server.getRegionsInTransitionInRS().put(encodedNameBytes, Boolean.TRUE); 1931 } 1932 1933 if (Boolean.TRUE.equals(previous)) { 1934 // An open is in progress. This is supported, but let's log this. 1935 LOG.info("Receiving OPEN for the region:" + region.getRegionNameAsString() 1936 + ", which we are already trying to OPEN" 1937 + " - ignoring this new request for this region."); 1938 } 1939 1940 // We are opening this region. If it moves back and forth for whatever reason, we don't 1941 // want to keep returning the stale moved record while we are opening/if we close again. 1942 server.removeFromMovedRegions(region.getEncodedName()); 1943 1944 if (previous == null || !previous.booleanValue()) { 1945 htd = htds.get(region.getTable()); 1946 if (htd == null) { 1947 htd = server.getTableDescriptors().get(region.getTable()); 1948 htds.put(region.getTable(), htd); 1949 } 1950 if (htd == null) { 1951 throw new IOException("Missing table descriptor for " + region.getEncodedName()); 1952 } 1953 // If there is no action in progress, we can submit a specific handler. 1954 // Need to pass the expected version in the constructor. 1955 if (server.getExecutorService() == null) { 1956 LOG.info("No executor executorService; skipping open request"); 1957 } else { 1958 if (region.isMetaRegion()) { 1959 server.getExecutorService() 1960 .submit(new OpenMetaHandler(server, server, region, htd, masterSystemTime)); 1961 } else { 1962 if (regionOpenInfo.getFavoredNodesCount() > 0) { 1963 server.updateRegionFavoredNodesMapping(region.getEncodedName(), 1964 regionOpenInfo.getFavoredNodesList()); 1965 } 1966 if (htd.getPriority() >= HConstants.ADMIN_QOS || region.getTable().isSystemTable()) { 1967 server.getExecutorService().submit( 1968 new OpenPriorityRegionHandler(server, server, region, htd, masterSystemTime)); 1969 } else { 1970 server.getExecutorService() 1971 .submit(new OpenRegionHandler(server, server, region, htd, masterSystemTime)); 1972 } 1973 } 1974 } 1975 } 1976 1977 builder.addOpeningState(RegionOpeningState.OPENED); 1978 } catch (IOException ie) { 1979 LOG.warn("Failed opening region " + region.getRegionNameAsString(), ie); 1980 if (isBulkAssign) { 1981 builder.addOpeningState(RegionOpeningState.FAILED_OPENING); 1982 } else { 1983 throw new ServiceException(ie); 1984 } 1985 } 1986 } 1987 return builder.build(); 1988 } 1989 1990 /** 1991 * Warmup a region on this server. This method should only be called by Master. It synchronously 1992 * opens the region and closes the region bringing the most important pages in cache. 1993 */ 1994 @Override 1995 public WarmupRegionResponse warmupRegion(final RpcController controller, 1996 final WarmupRegionRequest request) throws ServiceException { 1997 final RegionInfo region = ProtobufUtil.toRegionInfo(request.getRegionInfo()); 1998 WarmupRegionResponse response = WarmupRegionResponse.getDefaultInstance(); 1999 try { 2000 checkOpen(); 2001 String encodedName = region.getEncodedName(); 2002 byte[] encodedNameBytes = region.getEncodedNameAsBytes(); 2003 final HRegion onlineRegion = server.getRegion(encodedName); 2004 if (onlineRegion != null) { 2005 LOG.info("{} is online; skipping warmup", region); 2006 return response; 2007 } 2008 TableDescriptor htd = server.getTableDescriptors().get(region.getTable()); 2009 if (server.getRegionsInTransitionInRS().containsKey(encodedNameBytes)) { 2010 LOG.info("{} is in transition; skipping warmup", region); 2011 return response; 2012 } 2013 LOG.info("Warmup {}", region.getRegionNameAsString()); 2014 HRegion.warmupHRegion(region, htd, server.getWAL(region), server.getConfiguration(), server, 2015 null); 2016 } catch (IOException ie) { 2017 LOG.error("Failed warmup of {}", region.getRegionNameAsString(), ie); 2018 throw new ServiceException(ie); 2019 } 2020 2021 return response; 2022 } 2023 2024 private CellScanner getAndReset(RpcController controller) { 2025 HBaseRpcController hrc = (HBaseRpcController) controller; 2026 CellScanner cells = hrc.cellScanner(); 2027 hrc.setCellScanner(null); 2028 return cells; 2029 } 2030 2031 /** 2032 * Replay the given changes when distributedLogReplay WAL edits from a failed RS. The guarantee is 2033 * that the given mutations will be durable on the receiving RS if this method returns without any 2034 * exception. 2035 * @param controller the RPC controller 2036 * @param request the request 2037 * @deprecated Since 3.0.0, will be removed in 4.0.0. Not used any more, put here only for 2038 * compatibility with old region replica implementation. Now we will use 2039 * {@code replicateToReplica} method instead. 2040 */ 2041 @Deprecated 2042 @Override 2043 @QosPriority(priority = HConstants.REPLAY_QOS) 2044 public ReplicateWALEntryResponse replay(final RpcController controller, 2045 final ReplicateWALEntryRequest request) throws ServiceException { 2046 long before = EnvironmentEdgeManager.currentTime(); 2047 CellScanner cells = getAndReset(controller); 2048 try { 2049 checkOpen(); 2050 List<WALEntry> entries = request.getEntryList(); 2051 if (entries == null || entries.isEmpty()) { 2052 // empty input 2053 return ReplicateWALEntryResponse.newBuilder().build(); 2054 } 2055 ByteString regionName = entries.get(0).getKey().getEncodedRegionName(); 2056 HRegion region = server.getRegionByEncodedName(regionName.toStringUtf8()); 2057 RegionCoprocessorHost coprocessorHost = 2058 ServerRegionReplicaUtil.isDefaultReplica(region.getRegionInfo()) 2059 ? region.getCoprocessorHost() 2060 : null; // do not invoke coprocessors if this is a secondary region replica 2061 List<Pair<WALKey, WALEdit>> walEntries = new ArrayList<>(); 2062 2063 // Skip adding the edits to WAL if this is a secondary region replica 2064 boolean isPrimary = RegionReplicaUtil.isDefaultReplica(region.getRegionInfo()); 2065 Durability durability = isPrimary ? Durability.USE_DEFAULT : Durability.SKIP_WAL; 2066 2067 for (WALEntry entry : entries) { 2068 if (!regionName.equals(entry.getKey().getEncodedRegionName())) { 2069 throw new NotServingRegionException("Replay request contains entries from multiple " 2070 + "regions. First region:" + regionName.toStringUtf8() + " , other region:" 2071 + entry.getKey().getEncodedRegionName()); 2072 } 2073 if (server.nonceManager != null && isPrimary) { 2074 long nonceGroup = 2075 entry.getKey().hasNonceGroup() ? entry.getKey().getNonceGroup() : HConstants.NO_NONCE; 2076 long nonce = entry.getKey().hasNonce() ? entry.getKey().getNonce() : HConstants.NO_NONCE; 2077 server.nonceManager.reportOperationFromWal(nonceGroup, nonce, 2078 entry.getKey().getWriteTime()); 2079 } 2080 Pair<WALKey, WALEdit> walEntry = (coprocessorHost == null) ? null : new Pair<>(); 2081 List<MutationReplay> edits = 2082 WALSplitUtil.getMutationsFromWALEntry(entry, cells, walEntry, durability); 2083 if (coprocessorHost != null) { 2084 // Start coprocessor replay here. The coprocessor is for each WALEdit instead of a 2085 // KeyValue. 2086 if ( 2087 coprocessorHost.preWALRestore(region.getRegionInfo(), walEntry.getFirst(), 2088 walEntry.getSecond()) 2089 ) { 2090 // if bypass this log entry, ignore it ... 2091 continue; 2092 } 2093 walEntries.add(walEntry); 2094 } 2095 if (edits != null && !edits.isEmpty()) { 2096 // HBASE-17924 2097 // sort to improve lock efficiency 2098 Collections.sort(edits, (v1, v2) -> Row.COMPARATOR.compare(v1.mutation, v2.mutation)); 2099 long replaySeqId = (entry.getKey().hasOrigSequenceNumber()) 2100 ? entry.getKey().getOrigSequenceNumber() 2101 : entry.getKey().getLogSequenceNumber(); 2102 OperationStatus[] result = doReplayBatchOp(region, edits, replaySeqId); 2103 // check if it's a partial success 2104 for (int i = 0; result != null && i < result.length; i++) { 2105 if (result[i] != OperationStatus.SUCCESS) { 2106 throw new IOException(result[i].getExceptionMsg()); 2107 } 2108 } 2109 } 2110 } 2111 2112 // sync wal at the end because ASYNC_WAL is used above 2113 WAL wal = region.getWAL(); 2114 if (wal != null) { 2115 wal.sync(); 2116 } 2117 2118 if (coprocessorHost != null) { 2119 for (Pair<WALKey, WALEdit> entry : walEntries) { 2120 coprocessorHost.postWALRestore(region.getRegionInfo(), entry.getFirst(), 2121 entry.getSecond()); 2122 } 2123 } 2124 return ReplicateWALEntryResponse.newBuilder().build(); 2125 } catch (IOException ie) { 2126 throw new ServiceException(ie); 2127 } finally { 2128 final MetricsRegionServer metricsRegionServer = server.getMetrics(); 2129 if (metricsRegionServer != null) { 2130 metricsRegionServer.updateReplay(EnvironmentEdgeManager.currentTime() - before); 2131 } 2132 } 2133 } 2134 2135 /** 2136 * Replay the given changes on a secondary replica 2137 */ 2138 @Override 2139 public ReplicateWALEntryResponse replicateToReplica(RpcController controller, 2140 ReplicateWALEntryRequest request) throws ServiceException { 2141 CellScanner cells = getAndReset(controller); 2142 try { 2143 checkOpen(); 2144 List<WALEntry> entries = request.getEntryList(); 2145 if (entries == null || entries.isEmpty()) { 2146 // empty input 2147 return ReplicateWALEntryResponse.newBuilder().build(); 2148 } 2149 ByteString regionName = entries.get(0).getKey().getEncodedRegionName(); 2150 HRegion region = server.getRegionByEncodedName(regionName.toStringUtf8()); 2151 if (RegionReplicaUtil.isDefaultReplica(region.getRegionInfo())) { 2152 throw new DoNotRetryIOException( 2153 "Should not replicate to primary replica " + region.getRegionInfo() + ", CODE BUG?"); 2154 } 2155 for (WALEntry entry : entries) { 2156 if (!regionName.equals(entry.getKey().getEncodedRegionName())) { 2157 throw new NotServingRegionException( 2158 "ReplicateToReplica request contains entries from multiple " + "regions. First region:" 2159 + regionName.toStringUtf8() + " , other region:" 2160 + entry.getKey().getEncodedRegionName()); 2161 } 2162 region.replayWALEntry(entry, cells); 2163 } 2164 return ReplicateWALEntryResponse.newBuilder().build(); 2165 } catch (IOException ie) { 2166 throw new ServiceException(ie); 2167 } 2168 } 2169 2170 private void checkShouldRejectReplicationRequest(List<WALEntry> entries) throws IOException { 2171 ReplicationSourceService replicationSource = server.getReplicationSourceService(); 2172 if (replicationSource == null || entries.isEmpty()) { 2173 return; 2174 } 2175 // We can ensure that all entries are for one peer, so only need to check one entry's 2176 // table name. if the table hit sync replication at peer side and the peer cluster 2177 // is (or is transiting to) state ACTIVE or DOWNGRADE_ACTIVE, we should reject to apply 2178 // those entries according to the design doc. 2179 TableName table = TableName.valueOf(entries.get(0).getKey().getTableName().toByteArray()); 2180 if ( 2181 replicationSource.getSyncReplicationPeerInfoProvider().checkState(table, 2182 RejectReplicationRequestStateChecker.get()) 2183 ) { 2184 throw new DoNotRetryIOException( 2185 "Reject to apply to sink cluster because sync replication state of sink cluster " 2186 + "is ACTIVE or DOWNGRADE_ACTIVE, table: " + table); 2187 } 2188 } 2189 2190 /** 2191 * Replicate WAL entries on the region server. 2192 * @param controller the RPC controller 2193 * @param request the request 2194 */ 2195 @Override 2196 @QosPriority(priority = HConstants.REPLICATION_QOS) 2197 public ReplicateWALEntryResponse replicateWALEntry(final RpcController controller, 2198 final ReplicateWALEntryRequest request) throws ServiceException { 2199 try { 2200 checkOpen(); 2201 if (server.getReplicationSinkService() != null) { 2202 requestCount.increment(); 2203 List<WALEntry> entries = request.getEntryList(); 2204 checkShouldRejectReplicationRequest(entries); 2205 CellScanner cellScanner = getAndReset(controller); 2206 server.getRegionServerCoprocessorHost().preReplicateLogEntries(); 2207 server.getReplicationSinkService().replicateLogEntries(entries, cellScanner, 2208 request.getReplicationClusterId(), request.getSourceBaseNamespaceDirPath(), 2209 request.getSourceHFileArchiveDirPath()); 2210 server.getRegionServerCoprocessorHost().postReplicateLogEntries(); 2211 return ReplicateWALEntryResponse.newBuilder().build(); 2212 } else { 2213 throw new ServiceException("Replication services are not initialized yet"); 2214 } 2215 } catch (IOException ie) { 2216 throw new ServiceException(ie); 2217 } 2218 } 2219 2220 /** 2221 * Roll the WAL writer of the region server. 2222 * @param controller the RPC controller 2223 * @param request the request 2224 */ 2225 @Override 2226 public RollWALWriterResponse rollWALWriter(final RpcController controller, 2227 final RollWALWriterRequest request) throws ServiceException { 2228 try { 2229 checkOpen(); 2230 requestCount.increment(); 2231 server.getRegionServerCoprocessorHost().preRollWALWriterRequest(); 2232 server.getWalRoller().requestRollAll(); 2233 server.getRegionServerCoprocessorHost().postRollWALWriterRequest(); 2234 RollWALWriterResponse.Builder builder = RollWALWriterResponse.newBuilder(); 2235 return builder.build(); 2236 } catch (IOException ie) { 2237 throw new ServiceException(ie); 2238 } 2239 } 2240 2241 /** 2242 * Stop the region server. 2243 * @param controller the RPC controller 2244 * @param request the request 2245 */ 2246 @Override 2247 @QosPriority(priority = HConstants.ADMIN_QOS) 2248 public StopServerResponse stopServer(final RpcController controller, 2249 final StopServerRequest request) throws ServiceException { 2250 rpcPreCheck("stopServer"); 2251 requestCount.increment(); 2252 String reason = request.getReason(); 2253 server.stop(reason); 2254 return StopServerResponse.newBuilder().build(); 2255 } 2256 2257 @Override 2258 public UpdateFavoredNodesResponse updateFavoredNodes(RpcController controller, 2259 UpdateFavoredNodesRequest request) throws ServiceException { 2260 rpcPreCheck("updateFavoredNodes"); 2261 List<UpdateFavoredNodesRequest.RegionUpdateInfo> openInfoList = request.getUpdateInfoList(); 2262 UpdateFavoredNodesResponse.Builder respBuilder = UpdateFavoredNodesResponse.newBuilder(); 2263 for (UpdateFavoredNodesRequest.RegionUpdateInfo regionUpdateInfo : openInfoList) { 2264 RegionInfo hri = ProtobufUtil.toRegionInfo(regionUpdateInfo.getRegion()); 2265 if (regionUpdateInfo.getFavoredNodesCount() > 0) { 2266 server.updateRegionFavoredNodesMapping(hri.getEncodedName(), 2267 regionUpdateInfo.getFavoredNodesList()); 2268 } 2269 } 2270 respBuilder.setResponse(openInfoList.size()); 2271 return respBuilder.build(); 2272 } 2273 2274 /** 2275 * Atomically bulk load several HFiles into an open region 2276 * @return true if successful, false is failed but recoverably (no action) 2277 * @throws ServiceException if failed unrecoverably 2278 */ 2279 @Override 2280 public BulkLoadHFileResponse bulkLoadHFile(final RpcController controller, 2281 final BulkLoadHFileRequest request) throws ServiceException { 2282 long start = EnvironmentEdgeManager.currentTime(); 2283 List<String> clusterIds = new ArrayList<>(request.getClusterIdsList()); 2284 if (clusterIds.contains(this.server.getClusterId())) { 2285 return BulkLoadHFileResponse.newBuilder().setLoaded(true).build(); 2286 } else { 2287 clusterIds.add(this.server.getClusterId()); 2288 } 2289 try { 2290 checkOpen(); 2291 requestCount.increment(); 2292 HRegion region = getRegion(request.getRegion()); 2293 final boolean spaceQuotaEnabled = QuotaUtil.isQuotaEnabled(getConfiguration()); 2294 long sizeToBeLoaded = -1; 2295 2296 // Check to see if this bulk load would exceed the space quota for this table 2297 if (spaceQuotaEnabled) { 2298 ActivePolicyEnforcement activeSpaceQuotas = getSpaceQuotaManager().getActiveEnforcements(); 2299 SpaceViolationPolicyEnforcement enforcement = 2300 activeSpaceQuotas.getPolicyEnforcement(region); 2301 if (enforcement != null) { 2302 // Bulk loads must still be atomic. We must enact all or none. 2303 List<String> filePaths = new ArrayList<>(request.getFamilyPathCount()); 2304 for (FamilyPath familyPath : request.getFamilyPathList()) { 2305 filePaths.add(familyPath.getPath()); 2306 } 2307 // Check if the batch of files exceeds the current quota 2308 sizeToBeLoaded = enforcement.computeBulkLoadSize(getFileSystem(filePaths), filePaths); 2309 } 2310 } 2311 // secure bulk load 2312 Map<byte[], List<Path>> map = 2313 server.getSecureBulkLoadManager().secureBulkLoadHFiles(region, request, clusterIds); 2314 BulkLoadHFileResponse.Builder builder = BulkLoadHFileResponse.newBuilder(); 2315 builder.setLoaded(map != null); 2316 if (map != null) { 2317 // Treat any negative size as a flag to "ignore" updating the region size as that is 2318 // not possible to occur in real life (cannot bulk load a file with negative size) 2319 if (spaceQuotaEnabled && sizeToBeLoaded > 0) { 2320 if (LOG.isTraceEnabled()) { 2321 LOG.trace("Incrementing space use of " + region.getRegionInfo() + " by " 2322 + sizeToBeLoaded + " bytes"); 2323 } 2324 // Inform space quotas of the new files for this region 2325 getSpaceQuotaManager().getRegionSizeStore().incrementRegionSize(region.getRegionInfo(), 2326 sizeToBeLoaded); 2327 } 2328 } 2329 return builder.build(); 2330 } catch (IOException ie) { 2331 throw new ServiceException(ie); 2332 } finally { 2333 final MetricsRegionServer metricsRegionServer = server.getMetrics(); 2334 if (metricsRegionServer != null) { 2335 metricsRegionServer.updateBulkLoad(EnvironmentEdgeManager.currentTime() - start); 2336 } 2337 } 2338 } 2339 2340 @Override 2341 public PrepareBulkLoadResponse prepareBulkLoad(RpcController controller, 2342 PrepareBulkLoadRequest request) throws ServiceException { 2343 try { 2344 checkOpen(); 2345 requestCount.increment(); 2346 2347 HRegion region = getRegion(request.getRegion()); 2348 2349 String bulkToken = server.getSecureBulkLoadManager().prepareBulkLoad(region, request); 2350 PrepareBulkLoadResponse.Builder builder = PrepareBulkLoadResponse.newBuilder(); 2351 builder.setBulkToken(bulkToken); 2352 return builder.build(); 2353 } catch (IOException ie) { 2354 throw new ServiceException(ie); 2355 } 2356 } 2357 2358 @Override 2359 public CleanupBulkLoadResponse cleanupBulkLoad(RpcController controller, 2360 CleanupBulkLoadRequest request) throws ServiceException { 2361 try { 2362 checkOpen(); 2363 requestCount.increment(); 2364 2365 HRegion region = getRegion(request.getRegion()); 2366 2367 server.getSecureBulkLoadManager().cleanupBulkLoad(region, request); 2368 return CleanupBulkLoadResponse.newBuilder().build(); 2369 } catch (IOException ie) { 2370 throw new ServiceException(ie); 2371 } 2372 } 2373 2374 @Override 2375 public CoprocessorServiceResponse execService(final RpcController controller, 2376 final CoprocessorServiceRequest request) throws ServiceException { 2377 try { 2378 checkOpen(); 2379 requestCount.increment(); 2380 HRegion region = getRegion(request.getRegion()); 2381 Message result = execServiceOnRegion(region, request.getCall()); 2382 CoprocessorServiceResponse.Builder builder = CoprocessorServiceResponse.newBuilder(); 2383 builder.setRegion(RequestConverter.buildRegionSpecifier(RegionSpecifierType.REGION_NAME, 2384 region.getRegionInfo().getRegionName())); 2385 // TODO: COPIES!!!!!! 2386 builder.setValue(builder.getValueBuilder().setName(result.getClass().getName()).setValue( 2387 org.apache.hbase.thirdparty.com.google.protobuf.ByteString.copyFrom(result.toByteArray()))); 2388 return builder.build(); 2389 } catch (IOException ie) { 2390 throw new ServiceException(ie); 2391 } 2392 } 2393 2394 private FileSystem getFileSystem(List<String> filePaths) throws IOException { 2395 if (filePaths.isEmpty()) { 2396 // local hdfs 2397 return server.getFileSystem(); 2398 } 2399 // source hdfs 2400 return new Path(filePaths.get(0)).getFileSystem(server.getConfiguration()); 2401 } 2402 2403 private Message execServiceOnRegion(HRegion region, 2404 final ClientProtos.CoprocessorServiceCall serviceCall) throws IOException { 2405 // ignore the passed in controller (from the serialized call) 2406 ServerRpcController execController = new ServerRpcController(); 2407 return region.execService(execController, serviceCall); 2408 } 2409 2410 private boolean shouldRejectRequestsFromClient(HRegion region) { 2411 TableName table = region.getRegionInfo().getTable(); 2412 ReplicationSourceService service = server.getReplicationSourceService(); 2413 return service != null && service.getSyncReplicationPeerInfoProvider().checkState(table, 2414 RejectRequestsFromClientStateChecker.get()); 2415 } 2416 2417 private void rejectIfInStandByState(HRegion region) throws DoNotRetryIOException { 2418 if (shouldRejectRequestsFromClient(region)) { 2419 throw new DoNotRetryIOException( 2420 region.getRegionInfo().getRegionNameAsString() + " is in STANDBY state."); 2421 } 2422 } 2423 2424 /** 2425 * Get data from a table. 2426 * @param controller the RPC controller 2427 * @param request the get request 2428 */ 2429 @Override 2430 public GetResponse get(final RpcController controller, final GetRequest request) 2431 throws ServiceException { 2432 long before = EnvironmentEdgeManager.currentTime(); 2433 OperationQuota quota = null; 2434 HRegion region = null; 2435 RpcCallContext context = RpcServer.getCurrentCall().orElse(null); 2436 try { 2437 checkOpen(); 2438 requestCount.increment(); 2439 rpcGetRequestCount.increment(); 2440 region = getRegion(request.getRegion()); 2441 rejectIfInStandByState(region); 2442 2443 GetResponse.Builder builder = GetResponse.newBuilder(); 2444 ClientProtos.Get get = request.getGet(); 2445 // An asynchbase client, https://github.com/OpenTSDB/asynchbase, starts by trying to do 2446 // a get closest before. Throwing the UnknownProtocolException signals it that it needs 2447 // to switch and do hbase2 protocol (HBase servers do not tell clients what versions 2448 // they are; its a problem for non-native clients like asynchbase. HBASE-20225. 2449 if (get.hasClosestRowBefore() && get.getClosestRowBefore()) { 2450 throw new UnknownProtocolException("Is this a pre-hbase-1.0.0 or asynchbase client? " 2451 + "Client is invoking getClosestRowBefore removed in hbase-2.0.0 replaced by " 2452 + "reverse Scan."); 2453 } 2454 Boolean existence = null; 2455 Result r = null; 2456 quota = getRpcQuotaManager().checkQuota(region, OperationQuota.OperationType.GET); 2457 2458 Get clientGet = ProtobufUtil.toGet(get); 2459 if (get.getExistenceOnly() && region.getCoprocessorHost() != null) { 2460 existence = region.getCoprocessorHost().preExists(clientGet); 2461 } 2462 if (existence == null) { 2463 if (context != null) { 2464 r = get(clientGet, (region), null, context); 2465 } else { 2466 // for test purpose 2467 r = region.get(clientGet); 2468 } 2469 if (get.getExistenceOnly()) { 2470 boolean exists = r.getExists(); 2471 if (region.getCoprocessorHost() != null) { 2472 exists = region.getCoprocessorHost().postExists(clientGet, exists); 2473 } 2474 existence = exists; 2475 } 2476 } 2477 if (existence != null) { 2478 ClientProtos.Result pbr = 2479 ProtobufUtil.toResult(existence, region.getRegionInfo().getReplicaId() != 0); 2480 builder.setResult(pbr); 2481 } else if (r != null) { 2482 ClientProtos.Result pbr; 2483 if ( 2484 isClientCellBlockSupport(context) && controller instanceof HBaseRpcController 2485 && VersionInfoUtil.hasMinimumVersion(context.getClientVersionInfo(), 1, 3) 2486 ) { 2487 pbr = ProtobufUtil.toResultNoData(r); 2488 ((HBaseRpcController) controller) 2489 .setCellScanner(CellUtil.createCellScanner(r.rawCells())); 2490 addSize(context, r); 2491 } else { 2492 pbr = ProtobufUtil.toResult(r); 2493 } 2494 builder.setResult(pbr); 2495 } 2496 // r.cells is null when an table.exists(get) call 2497 if (r != null && r.rawCells() != null) { 2498 quota.addGetResult(r); 2499 } 2500 return builder.build(); 2501 } catch (IOException ie) { 2502 throw new ServiceException(ie); 2503 } finally { 2504 final MetricsRegionServer metricsRegionServer = server.getMetrics(); 2505 if (metricsRegionServer != null && region != null) { 2506 long blockBytesScanned = context != null ? context.getBlockBytesScanned() : 0; 2507 metricsRegionServer.updateGet(region, EnvironmentEdgeManager.currentTime() - before, 2508 blockBytesScanned); 2509 } 2510 if (quota != null) { 2511 quota.close(); 2512 } 2513 } 2514 } 2515 2516 private Result get(Get get, HRegion region, RegionScannersCloseCallBack closeCallBack, 2517 RpcCallContext context) throws IOException { 2518 region.prepareGet(get); 2519 boolean stale = region.getRegionInfo().getReplicaId() != 0; 2520 2521 // This method is almost the same as HRegion#get. 2522 List<Cell> results = new ArrayList<>(); 2523 // pre-get CP hook 2524 if (region.getCoprocessorHost() != null) { 2525 if (region.getCoprocessorHost().preGet(get, results)) { 2526 region.metricsUpdateForGet(); 2527 return Result.create(results, get.isCheckExistenceOnly() ? !results.isEmpty() : null, 2528 stale); 2529 } 2530 } 2531 Scan scan = new Scan(get); 2532 if (scan.getLoadColumnFamiliesOnDemandValue() == null) { 2533 scan.setLoadColumnFamiliesOnDemand(region.isLoadingCfsOnDemandDefault()); 2534 } 2535 RegionScannerImpl scanner = null; 2536 try { 2537 scanner = region.getScanner(scan); 2538 scanner.next(results); 2539 } finally { 2540 if (scanner != null) { 2541 if (closeCallBack == null) { 2542 // If there is a context then the scanner can be added to the current 2543 // RpcCallContext. The rpc callback will take care of closing the 2544 // scanner, for eg in case 2545 // of get() 2546 context.setCallBack(scanner); 2547 } else { 2548 // The call is from multi() where the results from the get() are 2549 // aggregated and then send out to the 2550 // rpc. The rpccall back will close all such scanners created as part 2551 // of multi(). 2552 closeCallBack.addScanner(scanner); 2553 } 2554 } 2555 } 2556 2557 // post-get CP hook 2558 if (region.getCoprocessorHost() != null) { 2559 region.getCoprocessorHost().postGet(get, results); 2560 } 2561 region.metricsUpdateForGet(); 2562 2563 return Result.create(results, get.isCheckExistenceOnly() ? !results.isEmpty() : null, stale); 2564 } 2565 2566 private void checkBatchSizeAndLogLargeSize(MultiRequest request) throws ServiceException { 2567 int sum = 0; 2568 String firstRegionName = null; 2569 for (RegionAction regionAction : request.getRegionActionList()) { 2570 if (sum == 0) { 2571 firstRegionName = Bytes.toStringBinary(regionAction.getRegion().getValue().toByteArray()); 2572 } 2573 sum += regionAction.getActionCount(); 2574 } 2575 if (sum > rowSizeWarnThreshold) { 2576 LOG.warn("Large batch operation detected (greater than " + rowSizeWarnThreshold 2577 + ") (HBASE-18023)." + " Requested Number of Rows: " + sum + " Client: " 2578 + RpcServer.getRequestUserName().orElse(null) + "/" 2579 + RpcServer.getRemoteAddress().orElse(null) + " first region in multi=" + firstRegionName); 2580 if (rejectRowsWithSizeOverThreshold) { 2581 throw new ServiceException( 2582 "Rejecting large batch operation for current batch with firstRegionName: " 2583 + firstRegionName + " , Requested Number of Rows: " + sum + " , Size Threshold: " 2584 + rowSizeWarnThreshold); 2585 } 2586 } 2587 } 2588 2589 private void failRegionAction(MultiResponse.Builder responseBuilder, 2590 RegionActionResult.Builder regionActionResultBuilder, RegionAction regionAction, 2591 CellScanner cellScanner, Throwable error) { 2592 rpcServer.getMetrics().exception(error); 2593 regionActionResultBuilder.setException(ResponseConverter.buildException(error)); 2594 responseBuilder.addRegionActionResult(regionActionResultBuilder.build()); 2595 // All Mutations in this RegionAction not executed as we can not see the Region online here 2596 // in this RS. Will be retried from Client. Skipping all the Cells in CellScanner 2597 // corresponding to these Mutations. 2598 if (cellScanner != null) { 2599 skipCellsForMutations(regionAction.getActionList(), cellScanner); 2600 } 2601 } 2602 2603 private boolean isReplicationRequest(Action action) { 2604 // replication request can only be put or delete. 2605 if (!action.hasMutation()) { 2606 return false; 2607 } 2608 MutationProto mutation = action.getMutation(); 2609 MutationType type = mutation.getMutateType(); 2610 if (type != MutationType.PUT && type != MutationType.DELETE) { 2611 return false; 2612 } 2613 // replication will set a special attribute so we can make use of it to decide whether a request 2614 // is for replication. 2615 return mutation.getAttributeList().stream().map(p -> p.getName()) 2616 .filter(n -> n.equals(ReplicationUtils.REPLICATION_ATTR_NAME)).findAny().isPresent(); 2617 } 2618 2619 /** 2620 * Execute multiple actions on a table: get, mutate, and/or execCoprocessor 2621 * @param rpcc the RPC controller 2622 * @param request the multi request 2623 */ 2624 @Override 2625 public MultiResponse multi(final RpcController rpcc, final MultiRequest request) 2626 throws ServiceException { 2627 try { 2628 checkOpen(); 2629 } catch (IOException ie) { 2630 throw new ServiceException(ie); 2631 } 2632 2633 checkBatchSizeAndLogLargeSize(request); 2634 2635 // rpc controller is how we bring in data via the back door; it is unprotobuf'ed data. 2636 // It is also the conduit via which we pass back data. 2637 HBaseRpcController controller = (HBaseRpcController) rpcc; 2638 CellScanner cellScanner = controller != null ? controller.cellScanner() : null; 2639 if (controller != null) { 2640 controller.setCellScanner(null); 2641 } 2642 2643 long nonceGroup = request.hasNonceGroup() ? request.getNonceGroup() : HConstants.NO_NONCE; 2644 2645 MultiResponse.Builder responseBuilder = MultiResponse.newBuilder(); 2646 RegionActionResult.Builder regionActionResultBuilder = RegionActionResult.newBuilder(); 2647 this.rpcMultiRequestCount.increment(); 2648 this.requestCount.increment(); 2649 ActivePolicyEnforcement spaceQuotaEnforcement = getSpaceQuotaManager().getActiveEnforcements(); 2650 2651 // We no longer use MultiRequest#condition. Instead, we use RegionAction#condition. The 2652 // following logic is for backward compatibility as old clients still use 2653 // MultiRequest#condition in case of checkAndMutate with RowMutations. 2654 if (request.hasCondition()) { 2655 if (request.getRegionActionList().isEmpty()) { 2656 // If the region action list is empty, do nothing. 2657 responseBuilder.setProcessed(true); 2658 return responseBuilder.build(); 2659 } 2660 2661 RegionAction regionAction = request.getRegionAction(0); 2662 2663 // When request.hasCondition() is true, regionAction.getAtomic() should be always true. So 2664 // we can assume regionAction.getAtomic() is true here. 2665 assert regionAction.getAtomic(); 2666 2667 OperationQuota quota; 2668 HRegion region; 2669 RegionSpecifier regionSpecifier = regionAction.getRegion(); 2670 2671 try { 2672 region = getRegion(regionSpecifier); 2673 quota = getRpcQuotaManager().checkQuota(region, regionAction.getActionList()); 2674 } catch (IOException e) { 2675 failRegionAction(responseBuilder, regionActionResultBuilder, regionAction, cellScanner, e); 2676 return responseBuilder.build(); 2677 } 2678 2679 try { 2680 boolean rejectIfFromClient = shouldRejectRequestsFromClient(region); 2681 // We only allow replication in standby state and it will not set the atomic flag. 2682 if (rejectIfFromClient) { 2683 failRegionAction(responseBuilder, regionActionResultBuilder, regionAction, cellScanner, 2684 new DoNotRetryIOException( 2685 region.getRegionInfo().getRegionNameAsString() + " is in STANDBY state")); 2686 return responseBuilder.build(); 2687 } 2688 2689 try { 2690 CheckAndMutateResult result = checkAndMutate(region, regionAction.getActionList(), 2691 cellScanner, request.getCondition(), nonceGroup, spaceQuotaEnforcement); 2692 responseBuilder.setProcessed(result.isSuccess()); 2693 ClientProtos.ResultOrException.Builder resultOrExceptionOrBuilder = 2694 ClientProtos.ResultOrException.newBuilder(); 2695 for (int i = 0; i < regionAction.getActionCount(); i++) { 2696 // To unify the response format with doNonAtomicRegionMutation and read through 2697 // client's AsyncProcess we have to add an empty result instance per operation 2698 resultOrExceptionOrBuilder.clear(); 2699 resultOrExceptionOrBuilder.setIndex(i); 2700 regionActionResultBuilder.addResultOrException(resultOrExceptionOrBuilder.build()); 2701 } 2702 } catch (IOException e) { 2703 rpcServer.getMetrics().exception(e); 2704 // As it's an atomic operation with a condition, we may expect it's a global failure. 2705 regionActionResultBuilder.setException(ResponseConverter.buildException(e)); 2706 } 2707 } finally { 2708 quota.close(); 2709 } 2710 2711 responseBuilder.addRegionActionResult(regionActionResultBuilder.build()); 2712 ClientProtos.RegionLoadStats regionLoadStats = region.getLoadStatistics(); 2713 if (regionLoadStats != null) { 2714 responseBuilder.setRegionStatistics(MultiRegionLoadStats.newBuilder() 2715 .addRegion(regionSpecifier).addStat(regionLoadStats).build()); 2716 } 2717 return responseBuilder.build(); 2718 } 2719 2720 // this will contain all the cells that we need to return. It's created later, if needed. 2721 List<CellScannable> cellsToReturn = null; 2722 RegionScannersCloseCallBack closeCallBack = null; 2723 RpcCallContext context = RpcServer.getCurrentCall().orElse(null); 2724 Map<RegionSpecifier, ClientProtos.RegionLoadStats> regionStats = 2725 new HashMap<>(request.getRegionActionCount()); 2726 2727 for (RegionAction regionAction : request.getRegionActionList()) { 2728 OperationQuota quota; 2729 HRegion region; 2730 RegionSpecifier regionSpecifier = regionAction.getRegion(); 2731 regionActionResultBuilder.clear(); 2732 2733 try { 2734 region = getRegion(regionSpecifier); 2735 quota = getRpcQuotaManager().checkQuota(region, regionAction.getActionList()); 2736 } catch (IOException e) { 2737 failRegionAction(responseBuilder, regionActionResultBuilder, regionAction, cellScanner, e); 2738 continue; // For this region it's a failure. 2739 } 2740 2741 try { 2742 boolean rejectIfFromClient = shouldRejectRequestsFromClient(region); 2743 2744 if (regionAction.hasCondition()) { 2745 // We only allow replication in standby state and it will not set the atomic flag. 2746 if (rejectIfFromClient) { 2747 failRegionAction(responseBuilder, regionActionResultBuilder, regionAction, cellScanner, 2748 new DoNotRetryIOException( 2749 region.getRegionInfo().getRegionNameAsString() + " is in STANDBY state")); 2750 continue; 2751 } 2752 2753 try { 2754 ClientProtos.ResultOrException.Builder resultOrExceptionOrBuilder = 2755 ClientProtos.ResultOrException.newBuilder(); 2756 if (regionAction.getActionCount() == 1) { 2757 CheckAndMutateResult result = 2758 checkAndMutate(region, quota, regionAction.getAction(0).getMutation(), cellScanner, 2759 regionAction.getCondition(), nonceGroup, spaceQuotaEnforcement, context); 2760 regionActionResultBuilder.setProcessed(result.isSuccess()); 2761 resultOrExceptionOrBuilder.setIndex(0); 2762 if (result.getResult() != null) { 2763 resultOrExceptionOrBuilder.setResult(ProtobufUtil.toResult(result.getResult())); 2764 } 2765 regionActionResultBuilder.addResultOrException(resultOrExceptionOrBuilder.build()); 2766 } else { 2767 CheckAndMutateResult result = checkAndMutate(region, regionAction.getActionList(), 2768 cellScanner, regionAction.getCondition(), nonceGroup, spaceQuotaEnforcement); 2769 regionActionResultBuilder.setProcessed(result.isSuccess()); 2770 for (int i = 0; i < regionAction.getActionCount(); i++) { 2771 if (i == 0 && result.getResult() != null) { 2772 // Set the result of the Increment/Append operations to the first element of the 2773 // ResultOrException list 2774 resultOrExceptionOrBuilder.setIndex(i); 2775 regionActionResultBuilder.addResultOrException(resultOrExceptionOrBuilder 2776 .setResult(ProtobufUtil.toResult(result.getResult())).build()); 2777 continue; 2778 } 2779 // To unify the response format with doNonAtomicRegionMutation and read through 2780 // client's AsyncProcess we have to add an empty result instance per operation 2781 resultOrExceptionOrBuilder.clear(); 2782 resultOrExceptionOrBuilder.setIndex(i); 2783 regionActionResultBuilder.addResultOrException(resultOrExceptionOrBuilder.build()); 2784 } 2785 } 2786 } catch (IOException e) { 2787 rpcServer.getMetrics().exception(e); 2788 // As it's an atomic operation with a condition, we may expect it's a global failure. 2789 regionActionResultBuilder.setException(ResponseConverter.buildException(e)); 2790 } 2791 } else if (regionAction.hasAtomic() && regionAction.getAtomic()) { 2792 // We only allow replication in standby state and it will not set the atomic flag. 2793 if (rejectIfFromClient) { 2794 failRegionAction(responseBuilder, regionActionResultBuilder, regionAction, cellScanner, 2795 new DoNotRetryIOException( 2796 region.getRegionInfo().getRegionNameAsString() + " is in STANDBY state")); 2797 continue; 2798 } 2799 try { 2800 doAtomicBatchOp(regionActionResultBuilder, region, quota, regionAction.getActionList(), 2801 cellScanner, nonceGroup, spaceQuotaEnforcement); 2802 regionActionResultBuilder.setProcessed(true); 2803 // We no longer use MultiResponse#processed. Instead, we use 2804 // RegionActionResult#processed. This is for backward compatibility for old clients. 2805 responseBuilder.setProcessed(true); 2806 } catch (IOException e) { 2807 rpcServer.getMetrics().exception(e); 2808 // As it's atomic, we may expect it's a global failure. 2809 regionActionResultBuilder.setException(ResponseConverter.buildException(e)); 2810 } 2811 } else { 2812 if ( 2813 rejectIfFromClient && regionAction.getActionCount() > 0 2814 && !isReplicationRequest(regionAction.getAction(0)) 2815 ) { 2816 // fail if it is not a replication request 2817 failRegionAction(responseBuilder, regionActionResultBuilder, regionAction, cellScanner, 2818 new DoNotRetryIOException( 2819 region.getRegionInfo().getRegionNameAsString() + " is in STANDBY state")); 2820 continue; 2821 } 2822 // doNonAtomicRegionMutation manages the exception internally 2823 if (context != null && closeCallBack == null) { 2824 // An RpcCallBack that creates a list of scanners that needs to perform callBack 2825 // operation on completion of multiGets. 2826 // Set this only once 2827 closeCallBack = new RegionScannersCloseCallBack(); 2828 context.setCallBack(closeCallBack); 2829 } 2830 cellsToReturn = doNonAtomicRegionMutation(region, quota, regionAction, cellScanner, 2831 regionActionResultBuilder, cellsToReturn, nonceGroup, closeCallBack, context, 2832 spaceQuotaEnforcement); 2833 } 2834 } finally { 2835 quota.close(); 2836 } 2837 2838 responseBuilder.addRegionActionResult(regionActionResultBuilder.build()); 2839 ClientProtos.RegionLoadStats regionLoadStats = region.getLoadStatistics(); 2840 if (regionLoadStats != null) { 2841 regionStats.put(regionSpecifier, regionLoadStats); 2842 } 2843 } 2844 // Load the controller with the Cells to return. 2845 if (cellsToReturn != null && !cellsToReturn.isEmpty() && controller != null) { 2846 controller.setCellScanner(CellUtil.createCellScanner(cellsToReturn)); 2847 } 2848 2849 MultiRegionLoadStats.Builder builder = MultiRegionLoadStats.newBuilder(); 2850 for (Entry<RegionSpecifier, ClientProtos.RegionLoadStats> stat : regionStats.entrySet()) { 2851 builder.addRegion(stat.getKey()); 2852 builder.addStat(stat.getValue()); 2853 } 2854 responseBuilder.setRegionStatistics(builder); 2855 return responseBuilder.build(); 2856 } 2857 2858 private void skipCellsForMutations(List<Action> actions, CellScanner cellScanner) { 2859 if (cellScanner == null) { 2860 return; 2861 } 2862 for (Action action : actions) { 2863 skipCellsForMutation(action, cellScanner); 2864 } 2865 } 2866 2867 private void skipCellsForMutation(Action action, CellScanner cellScanner) { 2868 if (cellScanner == null) { 2869 return; 2870 } 2871 try { 2872 if (action.hasMutation()) { 2873 MutationProto m = action.getMutation(); 2874 if (m.hasAssociatedCellCount()) { 2875 for (int i = 0; i < m.getAssociatedCellCount(); i++) { 2876 cellScanner.advance(); 2877 } 2878 } 2879 } 2880 } catch (IOException e) { 2881 // No need to handle these Individual Muatation level issue. Any way this entire RegionAction 2882 // marked as failed as we could not see the Region here. At client side the top level 2883 // RegionAction exception will be considered first. 2884 LOG.error("Error while skipping Cells in CellScanner for invalid Region Mutations", e); 2885 } 2886 } 2887 2888 /** 2889 * Mutate data in a table. 2890 * @param rpcc the RPC controller 2891 * @param request the mutate request 2892 */ 2893 @Override 2894 public MutateResponse mutate(final RpcController rpcc, final MutateRequest request) 2895 throws ServiceException { 2896 // rpc controller is how we bring in data via the back door; it is unprotobuf'ed data. 2897 // It is also the conduit via which we pass back data. 2898 HBaseRpcController controller = (HBaseRpcController) rpcc; 2899 CellScanner cellScanner = controller != null ? controller.cellScanner() : null; 2900 OperationQuota quota = null; 2901 RpcCallContext context = RpcServer.getCurrentCall().orElse(null); 2902 // Clear scanner so we are not holding on to reference across call. 2903 if (controller != null) { 2904 controller.setCellScanner(null); 2905 } 2906 try { 2907 checkOpen(); 2908 requestCount.increment(); 2909 rpcMutateRequestCount.increment(); 2910 HRegion region = getRegion(request.getRegion()); 2911 rejectIfInStandByState(region); 2912 MutateResponse.Builder builder = MutateResponse.newBuilder(); 2913 MutationProto mutation = request.getMutation(); 2914 if (!region.getRegionInfo().isMetaRegion()) { 2915 server.getMemStoreFlusher().reclaimMemStoreMemory(); 2916 } 2917 long nonceGroup = request.hasNonceGroup() ? request.getNonceGroup() : HConstants.NO_NONCE; 2918 quota = getRpcQuotaManager().checkQuota(region, OperationQuota.OperationType.MUTATE); 2919 ActivePolicyEnforcement spaceQuotaEnforcement = 2920 getSpaceQuotaManager().getActiveEnforcements(); 2921 2922 if (request.hasCondition()) { 2923 CheckAndMutateResult result = checkAndMutate(region, quota, mutation, cellScanner, 2924 request.getCondition(), nonceGroup, spaceQuotaEnforcement, context); 2925 builder.setProcessed(result.isSuccess()); 2926 boolean clientCellBlockSupported = isClientCellBlockSupport(context); 2927 addResult(builder, result.getResult(), controller, clientCellBlockSupported); 2928 if (clientCellBlockSupported) { 2929 addSize(context, result.getResult()); 2930 } 2931 } else { 2932 Result r = null; 2933 Boolean processed = null; 2934 MutationType type = mutation.getMutateType(); 2935 switch (type) { 2936 case APPEND: 2937 // TODO: this doesn't actually check anything. 2938 r = append(region, quota, mutation, cellScanner, nonceGroup, spaceQuotaEnforcement, 2939 context); 2940 break; 2941 case INCREMENT: 2942 // TODO: this doesn't actually check anything. 2943 r = increment(region, quota, mutation, cellScanner, nonceGroup, spaceQuotaEnforcement, 2944 context); 2945 break; 2946 case PUT: 2947 put(region, quota, mutation, cellScanner, spaceQuotaEnforcement); 2948 processed = Boolean.TRUE; 2949 break; 2950 case DELETE: 2951 delete(region, quota, mutation, cellScanner, spaceQuotaEnforcement); 2952 processed = Boolean.TRUE; 2953 break; 2954 default: 2955 throw new DoNotRetryIOException("Unsupported mutate type: " + type.name()); 2956 } 2957 if (processed != null) { 2958 builder.setProcessed(processed); 2959 } 2960 boolean clientCellBlockSupported = isClientCellBlockSupport(context); 2961 addResult(builder, r, controller, clientCellBlockSupported); 2962 if (clientCellBlockSupported) { 2963 addSize(context, r); 2964 } 2965 } 2966 return builder.build(); 2967 } catch (IOException ie) { 2968 server.checkFileSystem(); 2969 throw new ServiceException(ie); 2970 } finally { 2971 if (quota != null) { 2972 quota.close(); 2973 } 2974 } 2975 } 2976 2977 private void put(HRegion region, OperationQuota quota, MutationProto mutation, 2978 CellScanner cellScanner, ActivePolicyEnforcement spaceQuota) throws IOException { 2979 long before = EnvironmentEdgeManager.currentTime(); 2980 Put put = ProtobufUtil.toPut(mutation, cellScanner); 2981 checkCellSizeLimit(region, put); 2982 spaceQuota.getPolicyEnforcement(region).check(put); 2983 quota.addMutation(put); 2984 region.put(put); 2985 2986 MetricsRegionServer metricsRegionServer = server.getMetrics(); 2987 if (metricsRegionServer != null) { 2988 long after = EnvironmentEdgeManager.currentTime(); 2989 metricsRegionServer.updatePut(region, after - before); 2990 } 2991 } 2992 2993 private void delete(HRegion region, OperationQuota quota, MutationProto mutation, 2994 CellScanner cellScanner, ActivePolicyEnforcement spaceQuota) throws IOException { 2995 long before = EnvironmentEdgeManager.currentTime(); 2996 Delete delete = ProtobufUtil.toDelete(mutation, cellScanner); 2997 checkCellSizeLimit(region, delete); 2998 spaceQuota.getPolicyEnforcement(region).check(delete); 2999 quota.addMutation(delete); 3000 region.delete(delete); 3001 3002 MetricsRegionServer metricsRegionServer = server.getMetrics(); 3003 if (metricsRegionServer != null) { 3004 long after = EnvironmentEdgeManager.currentTime(); 3005 metricsRegionServer.updateDelete(region, after - before); 3006 } 3007 } 3008 3009 private CheckAndMutateResult checkAndMutate(HRegion region, OperationQuota quota, 3010 MutationProto mutation, CellScanner cellScanner, Condition condition, long nonceGroup, 3011 ActivePolicyEnforcement spaceQuota, RpcCallContext context) throws IOException { 3012 long before = EnvironmentEdgeManager.currentTime(); 3013 long blockBytesScannedBefore = context != null ? context.getBlockBytesScanned() : 0; 3014 CheckAndMutate checkAndMutate = ProtobufUtil.toCheckAndMutate(condition, mutation, cellScanner); 3015 long nonce = mutation.hasNonce() ? mutation.getNonce() : HConstants.NO_NONCE; 3016 checkCellSizeLimit(region, (Mutation) checkAndMutate.getAction()); 3017 spaceQuota.getPolicyEnforcement(region).check((Mutation) checkAndMutate.getAction()); 3018 quota.addMutation((Mutation) checkAndMutate.getAction()); 3019 3020 CheckAndMutateResult result = null; 3021 if (region.getCoprocessorHost() != null) { 3022 result = region.getCoprocessorHost().preCheckAndMutate(checkAndMutate); 3023 } 3024 if (result == null) { 3025 result = region.checkAndMutate(checkAndMutate, nonceGroup, nonce); 3026 if (region.getCoprocessorHost() != null) { 3027 result = region.getCoprocessorHost().postCheckAndMutate(checkAndMutate, result); 3028 } 3029 } 3030 MetricsRegionServer metricsRegionServer = server.getMetrics(); 3031 if (metricsRegionServer != null) { 3032 long after = EnvironmentEdgeManager.currentTime(); 3033 long blockBytesScanned = 3034 context != null ? context.getBlockBytesScanned() - blockBytesScannedBefore : 0; 3035 metricsRegionServer.updateCheckAndMutate(region, after - before, blockBytesScanned); 3036 3037 MutationType type = mutation.getMutateType(); 3038 switch (type) { 3039 case PUT: 3040 metricsRegionServer.updateCheckAndPut(region, after - before); 3041 break; 3042 case DELETE: 3043 metricsRegionServer.updateCheckAndDelete(region, after - before); 3044 break; 3045 default: 3046 break; 3047 } 3048 } 3049 return result; 3050 } 3051 3052 // This is used to keep compatible with the old client implementation. Consider remove it if we 3053 // decide to drop the support of the client that still sends close request to a region scanner 3054 // which has already been exhausted. 3055 @Deprecated 3056 private static final IOException SCANNER_ALREADY_CLOSED = new IOException() { 3057 3058 private static final long serialVersionUID = -4305297078988180130L; 3059 3060 @Override 3061 public synchronized Throwable fillInStackTrace() { 3062 return this; 3063 } 3064 }; 3065 3066 private RegionScannerHolder getRegionScanner(ScanRequest request) throws IOException { 3067 String scannerName = toScannerName(request.getScannerId()); 3068 RegionScannerHolder rsh = this.scanners.get(scannerName); 3069 if (rsh == null) { 3070 // just ignore the next or close request if scanner does not exists. 3071 if (closedScanners.getIfPresent(scannerName) != null) { 3072 throw SCANNER_ALREADY_CLOSED; 3073 } else { 3074 LOG.warn("Client tried to access missing scanner " + scannerName); 3075 throw new UnknownScannerException( 3076 "Unknown scanner '" + scannerName + "'. This can happen due to any of the following " 3077 + "reasons: a) Scanner id given is wrong, b) Scanner lease expired because of " 3078 + "long wait between consecutive client checkins, c) Server may be closing down, " 3079 + "d) RegionServer restart during upgrade.\nIf the issue is due to reason (b), a " 3080 + "possible fix would be increasing the value of" 3081 + "'hbase.client.scanner.timeout.period' configuration."); 3082 } 3083 } 3084 rejectIfInStandByState(rsh.r); 3085 RegionInfo hri = rsh.s.getRegionInfo(); 3086 // Yes, should be the same instance 3087 if (server.getOnlineRegion(hri.getRegionName()) != rsh.r) { 3088 String msg = "Region has changed on the scanner " + scannerName + ": regionName=" 3089 + hri.getRegionNameAsString() + ", scannerRegionName=" + rsh.r; 3090 LOG.warn(msg + ", closing..."); 3091 scanners.remove(scannerName); 3092 try { 3093 rsh.s.close(); 3094 } catch (IOException e) { 3095 LOG.warn("Getting exception closing " + scannerName, e); 3096 } finally { 3097 try { 3098 server.getLeaseManager().cancelLease(scannerName); 3099 } catch (LeaseException e) { 3100 LOG.warn("Getting exception closing " + scannerName, e); 3101 } 3102 } 3103 throw new NotServingRegionException(msg); 3104 } 3105 return rsh; 3106 } 3107 3108 /** 3109 * @return Pair with scannerName key to use with this new Scanner and its RegionScannerHolder 3110 * value. 3111 */ 3112 private Pair<String, RegionScannerHolder> newRegionScanner(ScanRequest request, 3113 ScanResponse.Builder builder) throws IOException { 3114 HRegion region = getRegion(request.getRegion()); 3115 rejectIfInStandByState(region); 3116 ClientProtos.Scan protoScan = request.getScan(); 3117 boolean isLoadingCfsOnDemandSet = protoScan.hasLoadColumnFamiliesOnDemand(); 3118 Scan scan = ProtobufUtil.toScan(protoScan); 3119 // if the request doesn't set this, get the default region setting. 3120 if (!isLoadingCfsOnDemandSet) { 3121 scan.setLoadColumnFamiliesOnDemand(region.isLoadingCfsOnDemandDefault()); 3122 } 3123 3124 if (!scan.hasFamilies()) { 3125 // Adding all families to scanner 3126 for (byte[] family : region.getTableDescriptor().getColumnFamilyNames()) { 3127 scan.addFamily(family); 3128 } 3129 } 3130 if (region.getCoprocessorHost() != null) { 3131 // preScannerOpen is not allowed to return a RegionScanner. Only post hook can create a 3132 // wrapper for the core created RegionScanner 3133 region.getCoprocessorHost().preScannerOpen(scan); 3134 } 3135 RegionScannerImpl coreScanner = region.getScanner(scan); 3136 Shipper shipper = coreScanner; 3137 RegionScanner scanner = coreScanner; 3138 try { 3139 if (region.getCoprocessorHost() != null) { 3140 scanner = region.getCoprocessorHost().postScannerOpen(scan, scanner); 3141 } 3142 } catch (Exception e) { 3143 // Although region coprocessor is for advanced users and they should take care of the 3144 // implementation to not damage the HBase system, closing the scanner on exception here does 3145 // not have any bad side effect, so let's do it 3146 scanner.close(); 3147 throw e; 3148 } 3149 long scannerId = scannerIdGenerator.generateNewScannerId(); 3150 builder.setScannerId(scannerId); 3151 builder.setMvccReadPoint(scanner.getMvccReadPoint()); 3152 builder.setTtl(scannerLeaseTimeoutPeriod); 3153 String scannerName = toScannerName(scannerId); 3154 3155 boolean fullRegionScan = 3156 !region.getRegionInfo().getTable().isSystemTable() && isFullRegionScan(scan, region); 3157 3158 return new Pair<String, RegionScannerHolder>(scannerName, 3159 addScanner(scannerName, scanner, shipper, region, scan.isNeedCursorResult(), fullRegionScan)); 3160 } 3161 3162 /** 3163 * The returned String is used as key doing look up of outstanding Scanners in this Servers' 3164 * this.scanners, the Map of outstanding scanners and their current state. 3165 * @param scannerId A scanner long id. 3166 * @return The long id as a String. 3167 */ 3168 private static String toScannerName(long scannerId) { 3169 return Long.toString(scannerId); 3170 } 3171 3172 private void checkScanNextCallSeq(ScanRequest request, RegionScannerHolder rsh) 3173 throws OutOfOrderScannerNextException { 3174 // if nextCallSeq does not match throw Exception straight away. This needs to be 3175 // performed even before checking of Lease. 3176 // See HBASE-5974 3177 if (request.hasNextCallSeq()) { 3178 long callSeq = request.getNextCallSeq(); 3179 if (!rsh.incNextCallSeq(callSeq)) { 3180 throw new OutOfOrderScannerNextException( 3181 "Expected nextCallSeq: " + rsh.getNextCallSeq() + " But the nextCallSeq got from client: " 3182 + request.getNextCallSeq() + "; request=" + TextFormat.shortDebugString(request)); 3183 } 3184 } 3185 } 3186 3187 private void addScannerLeaseBack(LeaseManager.Lease lease) { 3188 try { 3189 server.getLeaseManager().addLease(lease); 3190 } catch (LeaseStillHeldException e) { 3191 // should not happen as the scanner id is unique. 3192 throw new AssertionError(e); 3193 } 3194 } 3195 3196 // visible for testing only 3197 long getTimeLimit(RpcCall rpcCall, HBaseRpcController controller, 3198 boolean allowHeartbeatMessages) { 3199 // Set the time limit to be half of the more restrictive timeout value (one of the 3200 // timeout values must be positive). In the event that both values are positive, the 3201 // more restrictive of the two is used to calculate the limit. 3202 if (allowHeartbeatMessages) { 3203 long now = EnvironmentEdgeManager.currentTime(); 3204 long remainingTimeout = getRemainingRpcTimeout(rpcCall, controller, now); 3205 if (scannerLeaseTimeoutPeriod > 0 || remainingTimeout > 0) { 3206 long timeLimitDelta; 3207 if (scannerLeaseTimeoutPeriod > 0 && remainingTimeout > 0) { 3208 timeLimitDelta = Math.min(scannerLeaseTimeoutPeriod, remainingTimeout); 3209 } else { 3210 timeLimitDelta = 3211 scannerLeaseTimeoutPeriod > 0 ? scannerLeaseTimeoutPeriod : remainingTimeout; 3212 } 3213 3214 // Use half of whichever timeout value was more restrictive... But don't allow 3215 // the time limit to be less than the allowable minimum (could cause an 3216 // immediate timeout before scanning any data). 3217 timeLimitDelta = Math.max(timeLimitDelta / 2, minimumScanTimeLimitDelta); 3218 return now + timeLimitDelta; 3219 } 3220 } 3221 // Default value of timeLimit is negative to indicate no timeLimit should be 3222 // enforced. 3223 return -1L; 3224 } 3225 3226 private long getRemainingRpcTimeout(RpcCall call, HBaseRpcController controller, long now) { 3227 long timeout; 3228 if (controller != null && controller.getCallTimeout() > 0) { 3229 timeout = controller.getCallTimeout(); 3230 } else if (rpcTimeout > 0) { 3231 timeout = rpcTimeout; 3232 } else { 3233 return -1; 3234 } 3235 if (call != null) { 3236 timeout -= (now - call.getReceiveTime()); 3237 } 3238 // getTimeLimit ignores values <= 0, but timeout may now be negative if queue time was high. 3239 // return minimum value here in that case so we count this in calculating the final delta. 3240 return Math.max(minimumScanTimeLimitDelta, timeout); 3241 } 3242 3243 private void checkLimitOfRows(int numOfCompleteRows, int limitOfRows, boolean moreRows, 3244 ScannerContext scannerContext, ScanResponse.Builder builder) { 3245 if (numOfCompleteRows >= limitOfRows) { 3246 if (LOG.isTraceEnabled()) { 3247 LOG.trace("Done scanning, limit of rows reached, moreRows: " + moreRows 3248 + " scannerContext: " + scannerContext); 3249 } 3250 builder.setMoreResults(false); 3251 } 3252 } 3253 3254 // return whether we have more results in region. 3255 private void scan(HBaseRpcController controller, ScanRequest request, RegionScannerHolder rsh, 3256 long maxQuotaResultSize, int maxResults, int limitOfRows, List<Result> results, 3257 ScanResponse.Builder builder, RpcCall rpcCall) throws IOException { 3258 HRegion region = rsh.r; 3259 RegionScanner scanner = rsh.s; 3260 long maxResultSize; 3261 if (scanner.getMaxResultSize() > 0) { 3262 maxResultSize = Math.min(scanner.getMaxResultSize(), maxQuotaResultSize); 3263 } else { 3264 maxResultSize = maxQuotaResultSize; 3265 } 3266 // This is cells inside a row. Default size is 10 so if many versions or many cfs, 3267 // then we'll resize. Resizings show in profiler. Set it higher than 10. For now 3268 // arbitrary 32. TODO: keep record of general size of results being returned. 3269 ArrayList<Cell> values = new ArrayList<>(32); 3270 region.startRegionOperation(Operation.SCAN); 3271 long before = EnvironmentEdgeManager.currentTime(); 3272 // Used to check if we've matched the row limit set on the Scan 3273 int numOfCompleteRows = 0; 3274 // Count of times we call nextRaw; can be > numOfCompleteRows. 3275 int numOfNextRawCalls = 0; 3276 try { 3277 int numOfResults = 0; 3278 synchronized (scanner) { 3279 boolean stale = (region.getRegionInfo().getReplicaId() != 0); 3280 boolean clientHandlesPartials = 3281 request.hasClientHandlesPartials() && request.getClientHandlesPartials(); 3282 boolean clientHandlesHeartbeats = 3283 request.hasClientHandlesHeartbeats() && request.getClientHandlesHeartbeats(); 3284 3285 // On the server side we must ensure that the correct ordering of partial results is 3286 // returned to the client to allow them to properly reconstruct the partial results. 3287 // If the coprocessor host is adding to the result list, we cannot guarantee the 3288 // correct ordering of partial results and so we prevent partial results from being 3289 // formed. 3290 boolean serverGuaranteesOrderOfPartials = results.isEmpty(); 3291 boolean allowPartialResults = clientHandlesPartials && serverGuaranteesOrderOfPartials; 3292 boolean moreRows = false; 3293 3294 // Heartbeat messages occur when the processing of the ScanRequest is exceeds a 3295 // certain time threshold on the server. When the time threshold is exceeded, the 3296 // server stops the scan and sends back whatever Results it has accumulated within 3297 // that time period (may be empty). Since heartbeat messages have the potential to 3298 // create partial Results (in the event that the timeout occurs in the middle of a 3299 // row), we must only generate heartbeat messages when the client can handle both 3300 // heartbeats AND partials 3301 boolean allowHeartbeatMessages = clientHandlesHeartbeats && allowPartialResults; 3302 3303 long timeLimit = getTimeLimit(rpcCall, controller, allowHeartbeatMessages); 3304 3305 final LimitScope sizeScope = 3306 allowPartialResults ? LimitScope.BETWEEN_CELLS : LimitScope.BETWEEN_ROWS; 3307 final LimitScope timeScope = 3308 allowHeartbeatMessages ? LimitScope.BETWEEN_CELLS : LimitScope.BETWEEN_ROWS; 3309 3310 boolean trackMetrics = request.hasTrackScanMetrics() && request.getTrackScanMetrics(); 3311 3312 // Configure with limits for this RPC. Set keep progress true since size progress 3313 // towards size limit should be kept between calls to nextRaw 3314 ScannerContext.Builder contextBuilder = ScannerContext.newBuilder(true); 3315 // maxResultSize - either we can reach this much size for all cells(being read) data or sum 3316 // of heap size occupied by cells(being read). Cell data means its key and value parts. 3317 // maxQuotaResultSize - max results just from server side configuration and quotas, without 3318 // user's specified max. We use this for evaluating limits based on blocks (not cells). 3319 // We may have accumulated some results in coprocessor preScannerNext call. Subtract any 3320 // cell or block size from maximum here so we adhere to total limits of request. 3321 // Note: we track block size in StoreScanner. If the CP hook got cells from hbase, it will 3322 // have accumulated block bytes. If not, this will be 0 for block size. 3323 long maxCellSize = maxResultSize; 3324 long maxBlockSize = maxQuotaResultSize; 3325 if (rpcCall != null) { 3326 maxBlockSize -= rpcCall.getBlockBytesScanned(); 3327 maxCellSize -= rpcCall.getResponseCellSize(); 3328 } 3329 3330 contextBuilder.setSizeLimit(sizeScope, maxCellSize, maxCellSize, maxBlockSize); 3331 contextBuilder.setBatchLimit(scanner.getBatch()); 3332 contextBuilder.setTimeLimit(timeScope, timeLimit); 3333 contextBuilder.setTrackMetrics(trackMetrics); 3334 ScannerContext scannerContext = contextBuilder.build(); 3335 boolean limitReached = false; 3336 while (numOfResults < maxResults) { 3337 // Reset the batch progress to 0 before every call to RegionScanner#nextRaw. The 3338 // batch limit is a limit on the number of cells per Result. Thus, if progress is 3339 // being tracked (i.e. scannerContext.keepProgress() is true) then we need to 3340 // reset the batch progress between nextRaw invocations since we don't want the 3341 // batch progress from previous calls to affect future calls 3342 scannerContext.setBatchProgress(0); 3343 assert values.isEmpty(); 3344 3345 // Collect values to be returned here 3346 moreRows = scanner.nextRaw(values, scannerContext); 3347 if (rpcCall == null) { 3348 // When there is no RpcCallContext,copy EC to heap, then the scanner would close, 3349 // This can be an EXPENSIVE call. It may make an extra copy from offheap to onheap 3350 // buffers.See more details in HBASE-26036. 3351 CellUtil.cloneIfNecessary(values); 3352 } 3353 numOfNextRawCalls++; 3354 3355 if (!values.isEmpty()) { 3356 if (limitOfRows > 0) { 3357 // First we need to check if the last result is partial and we have a row change. If 3358 // so then we need to increase the numOfCompleteRows. 3359 if (results.isEmpty()) { 3360 if ( 3361 rsh.rowOfLastPartialResult != null 3362 && !CellUtil.matchingRows(values.get(0), rsh.rowOfLastPartialResult) 3363 ) { 3364 numOfCompleteRows++; 3365 checkLimitOfRows(numOfCompleteRows, limitOfRows, moreRows, scannerContext, 3366 builder); 3367 } 3368 } else { 3369 Result lastResult = results.get(results.size() - 1); 3370 if ( 3371 lastResult.mayHaveMoreCellsInRow() 3372 && !CellUtil.matchingRows(values.get(0), lastResult.getRow()) 3373 ) { 3374 numOfCompleteRows++; 3375 checkLimitOfRows(numOfCompleteRows, limitOfRows, moreRows, scannerContext, 3376 builder); 3377 } 3378 } 3379 if (builder.hasMoreResults() && !builder.getMoreResults()) { 3380 break; 3381 } 3382 } 3383 boolean mayHaveMoreCellsInRow = scannerContext.mayHaveMoreCellsInRow(); 3384 Result r = Result.create(values, null, stale, mayHaveMoreCellsInRow); 3385 results.add(r); 3386 numOfResults++; 3387 if (!mayHaveMoreCellsInRow && limitOfRows > 0) { 3388 numOfCompleteRows++; 3389 checkLimitOfRows(numOfCompleteRows, limitOfRows, moreRows, scannerContext, builder); 3390 if (builder.hasMoreResults() && !builder.getMoreResults()) { 3391 break; 3392 } 3393 } 3394 } else if (!moreRows && !results.isEmpty()) { 3395 // No more cells for the scan here, we need to ensure that the mayHaveMoreCellsInRow of 3396 // last result is false. Otherwise it's possible that: the first nextRaw returned 3397 // because BATCH_LIMIT_REACHED (BTW it happen to exhaust all cells of the scan),so the 3398 // last result's mayHaveMoreCellsInRow will be true. while the following nextRaw will 3399 // return with moreRows=false, which means moreResultsInRegion would be false, it will 3400 // be a contradictory state (HBASE-21206). 3401 int lastIdx = results.size() - 1; 3402 Result r = results.get(lastIdx); 3403 if (r.mayHaveMoreCellsInRow()) { 3404 results.set(lastIdx, Result.create(r.rawCells(), r.getExists(), r.isStale(), false)); 3405 } 3406 } 3407 boolean sizeLimitReached = scannerContext.checkSizeLimit(LimitScope.BETWEEN_ROWS); 3408 boolean timeLimitReached = scannerContext.checkTimeLimit(LimitScope.BETWEEN_ROWS); 3409 boolean resultsLimitReached = numOfResults >= maxResults; 3410 limitReached = sizeLimitReached || timeLimitReached || resultsLimitReached; 3411 3412 if (limitReached || !moreRows) { 3413 // With block size limit, we may exceed size limit without collecting any results. 3414 // In this case we want to send heartbeat and/or cursor. We don't want to send heartbeat 3415 // or cursor if results were collected, for example for cell size or heap size limits. 3416 boolean sizeLimitReachedWithoutResults = sizeLimitReached && results.isEmpty(); 3417 // We only want to mark a ScanResponse as a heartbeat message in the event that 3418 // there are more values to be read server side. If there aren't more values, 3419 // marking it as a heartbeat is wasteful because the client will need to issue 3420 // another ScanRequest only to realize that they already have all the values 3421 if (moreRows && (timeLimitReached || sizeLimitReachedWithoutResults)) { 3422 // Heartbeat messages occur when the time limit has been reached, or size limit has 3423 // been reached before collecting any results. This can happen for heavily filtered 3424 // scans which scan over too many blocks. 3425 builder.setHeartbeatMessage(true); 3426 if (rsh.needCursor) { 3427 Cell cursorCell = scannerContext.getLastPeekedCell(); 3428 if (cursorCell != null) { 3429 builder.setCursor(ProtobufUtil.toCursor(cursorCell)); 3430 } 3431 } 3432 } 3433 break; 3434 } 3435 values.clear(); 3436 } 3437 if (rpcCall != null) { 3438 rpcCall.incrementResponseCellSize(scannerContext.getHeapSizeProgress()); 3439 } 3440 builder.setMoreResultsInRegion(moreRows); 3441 // Check to see if the client requested that we track metrics server side. If the 3442 // client requested metrics, retrieve the metrics from the scanner context. 3443 if (trackMetrics) { 3444 // rather than increment yet another counter in StoreScanner, just set the value here 3445 // from block size progress before writing into the response 3446 scannerContext.getMetrics().countOfBlockBytesScanned 3447 .set(scannerContext.getBlockSizeProgress()); 3448 Map<String, Long> metrics = scannerContext.getMetrics().getMetricsMap(); 3449 ScanMetrics.Builder metricBuilder = ScanMetrics.newBuilder(); 3450 NameInt64Pair.Builder pairBuilder = NameInt64Pair.newBuilder(); 3451 3452 for (Entry<String, Long> entry : metrics.entrySet()) { 3453 pairBuilder.setName(entry.getKey()); 3454 pairBuilder.setValue(entry.getValue()); 3455 metricBuilder.addMetrics(pairBuilder.build()); 3456 } 3457 3458 builder.setScanMetrics(metricBuilder.build()); 3459 } 3460 } 3461 } finally { 3462 region.closeRegionOperation(); 3463 // Update serverside metrics, even on error. 3464 long end = EnvironmentEdgeManager.currentTime(); 3465 3466 long responseCellSize = 0; 3467 long blockBytesScanned = 0; 3468 if (rpcCall != null) { 3469 responseCellSize = rpcCall.getResponseCellSize(); 3470 blockBytesScanned = rpcCall.getBlockBytesScanned(); 3471 } 3472 region.getMetrics().updateScan(); 3473 final MetricsRegionServer metricsRegionServer = server.getMetrics(); 3474 if (metricsRegionServer != null) { 3475 metricsRegionServer.updateScan(region, end - before, responseCellSize, blockBytesScanned); 3476 metricsRegionServer.updateReadQueryMeter(region, numOfNextRawCalls); 3477 } 3478 } 3479 // coprocessor postNext hook 3480 if (region.getCoprocessorHost() != null) { 3481 region.getCoprocessorHost().postScannerNext(scanner, results, maxResults, true); 3482 } 3483 } 3484 3485 /** 3486 * Scan data in a table. 3487 * @param controller the RPC controller 3488 * @param request the scan request 3489 */ 3490 @Override 3491 public ScanResponse scan(final RpcController controller, final ScanRequest request) 3492 throws ServiceException { 3493 if (controller != null && !(controller instanceof HBaseRpcController)) { 3494 throw new UnsupportedOperationException( 3495 "We only do " + "HBaseRpcControllers! FIX IF A PROBLEM: " + controller); 3496 } 3497 if (!request.hasScannerId() && !request.hasScan()) { 3498 throw new ServiceException( 3499 new DoNotRetryIOException("Missing required input: scannerId or scan")); 3500 } 3501 try { 3502 checkOpen(); 3503 } catch (IOException e) { 3504 if (request.hasScannerId()) { 3505 String scannerName = toScannerName(request.getScannerId()); 3506 if (LOG.isDebugEnabled()) { 3507 LOG.debug( 3508 "Server shutting down and client tried to access missing scanner " + scannerName); 3509 } 3510 final LeaseManager leaseManager = server.getLeaseManager(); 3511 if (leaseManager != null) { 3512 try { 3513 leaseManager.cancelLease(scannerName); 3514 } catch (LeaseException le) { 3515 // No problem, ignore 3516 if (LOG.isTraceEnabled()) { 3517 LOG.trace("Un-able to cancel lease of scanner. It could already be closed."); 3518 } 3519 } 3520 } 3521 } 3522 throw new ServiceException(e); 3523 } 3524 requestCount.increment(); 3525 rpcScanRequestCount.increment(); 3526 RegionScannerHolder rsh; 3527 ScanResponse.Builder builder = ScanResponse.newBuilder(); 3528 String scannerName; 3529 try { 3530 if (request.hasScannerId()) { 3531 // The downstream projects such as AsyncHBase in OpenTSDB need this value. See HBASE-18000 3532 // for more details. 3533 long scannerId = request.getScannerId(); 3534 builder.setScannerId(scannerId); 3535 scannerName = toScannerName(scannerId); 3536 rsh = getRegionScanner(request); 3537 } else { 3538 Pair<String, RegionScannerHolder> scannerNameAndRSH = newRegionScanner(request, builder); 3539 scannerName = scannerNameAndRSH.getFirst(); 3540 rsh = scannerNameAndRSH.getSecond(); 3541 } 3542 } catch (IOException e) { 3543 if (e == SCANNER_ALREADY_CLOSED) { 3544 // Now we will close scanner automatically if there are no more results for this region but 3545 // the old client will still send a close request to us. Just ignore it and return. 3546 return builder.build(); 3547 } 3548 throw new ServiceException(e); 3549 } 3550 if (rsh.fullRegionScan) { 3551 rpcFullScanRequestCount.increment(); 3552 } 3553 HRegion region = rsh.r; 3554 LeaseManager.Lease lease; 3555 try { 3556 // Remove lease while its being processed in server; protects against case 3557 // where processing of request takes > lease expiration time. or null if none found. 3558 lease = server.getLeaseManager().removeLease(scannerName); 3559 } catch (LeaseException e) { 3560 throw new ServiceException(e); 3561 } 3562 if (request.hasRenew() && request.getRenew()) { 3563 // add back and return 3564 addScannerLeaseBack(lease); 3565 try { 3566 checkScanNextCallSeq(request, rsh); 3567 } catch (OutOfOrderScannerNextException e) { 3568 throw new ServiceException(e); 3569 } 3570 return builder.build(); 3571 } 3572 OperationQuota quota; 3573 try { 3574 quota = getRpcQuotaManager().checkQuota(region, OperationQuota.OperationType.SCAN); 3575 } catch (IOException e) { 3576 addScannerLeaseBack(lease); 3577 throw new ServiceException(e); 3578 } 3579 try { 3580 checkScanNextCallSeq(request, rsh); 3581 } catch (OutOfOrderScannerNextException e) { 3582 addScannerLeaseBack(lease); 3583 throw new ServiceException(e); 3584 } 3585 // Now we have increased the next call sequence. If we give client an error, the retry will 3586 // never success. So we'd better close the scanner and return a DoNotRetryIOException to client 3587 // and then client will try to open a new scanner. 3588 boolean closeScanner = request.hasCloseScanner() ? request.getCloseScanner() : false; 3589 int rows; // this is scan.getCaching 3590 if (request.hasNumberOfRows()) { 3591 rows = request.getNumberOfRows(); 3592 } else { 3593 rows = closeScanner ? 0 : 1; 3594 } 3595 RpcCall rpcCall = RpcServer.getCurrentCall().orElse(null); 3596 // now let's do the real scan. 3597 long maxQuotaResultSize = Math.min(maxScannerResultSize, quota.getReadAvailable()); 3598 RegionScanner scanner = rsh.s; 3599 // this is the limit of rows for this scan, if we the number of rows reach this value, we will 3600 // close the scanner. 3601 int limitOfRows; 3602 if (request.hasLimitOfRows()) { 3603 limitOfRows = request.getLimitOfRows(); 3604 } else { 3605 limitOfRows = -1; 3606 } 3607 boolean scannerClosed = false; 3608 try { 3609 List<Result> results = new ArrayList<>(Math.min(rows, 512)); 3610 if (rows > 0) { 3611 boolean done = false; 3612 // Call coprocessor. Get region info from scanner. 3613 if (region.getCoprocessorHost() != null) { 3614 Boolean bypass = region.getCoprocessorHost().preScannerNext(scanner, results, rows); 3615 if (!results.isEmpty()) { 3616 for (Result r : results) { 3617 // add cell size from CP results so we can track response size and update limits 3618 // when calling scan below if !done. We'll also have tracked block size if the CP 3619 // got results from hbase, since StoreScanner tracks that for all calls automatically. 3620 addSize(rpcCall, r); 3621 } 3622 } 3623 if (bypass != null && bypass.booleanValue()) { 3624 done = true; 3625 } 3626 } 3627 if (!done) { 3628 scan((HBaseRpcController) controller, request, rsh, maxQuotaResultSize, rows, limitOfRows, 3629 results, builder, rpcCall); 3630 } else { 3631 builder.setMoreResultsInRegion(!results.isEmpty()); 3632 } 3633 } else { 3634 // This is a open scanner call with numberOfRow = 0, so set more results in region to true. 3635 builder.setMoreResultsInRegion(true); 3636 } 3637 3638 quota.addScanResult(results); 3639 addResults(builder, results, (HBaseRpcController) controller, 3640 RegionReplicaUtil.isDefaultReplica(region.getRegionInfo()), 3641 isClientCellBlockSupport(rpcCall)); 3642 if (scanner.isFilterDone() && results.isEmpty()) { 3643 // If the scanner's filter - if any - is done with the scan 3644 // only set moreResults to false if the results is empty. This is used to keep compatible 3645 // with the old scan implementation where we just ignore the returned results if moreResults 3646 // is false. Can remove the isEmpty check after we get rid of the old implementation. 3647 builder.setMoreResults(false); 3648 } 3649 // Later we may close the scanner depending on this flag so here we need to make sure that we 3650 // have already set this flag. 3651 assert builder.hasMoreResultsInRegion(); 3652 // we only set moreResults to false in the above code, so set it to true if we haven't set it 3653 // yet. 3654 if (!builder.hasMoreResults()) { 3655 builder.setMoreResults(true); 3656 } 3657 if (builder.getMoreResults() && builder.getMoreResultsInRegion() && !results.isEmpty()) { 3658 // Record the last cell of the last result if it is a partial result 3659 // We need this to calculate the complete rows we have returned to client as the 3660 // mayHaveMoreCellsInRow is true does not mean that there will be extra cells for the 3661 // current row. We may filter out all the remaining cells for the current row and just 3662 // return the cells of the nextRow when calling RegionScanner.nextRaw. So here we need to 3663 // check for row change. 3664 Result lastResult = results.get(results.size() - 1); 3665 if (lastResult.mayHaveMoreCellsInRow()) { 3666 rsh.rowOfLastPartialResult = lastResult.getRow(); 3667 } else { 3668 rsh.rowOfLastPartialResult = null; 3669 } 3670 } 3671 if (!builder.getMoreResults() || !builder.getMoreResultsInRegion() || closeScanner) { 3672 scannerClosed = true; 3673 closeScanner(region, scanner, scannerName, rpcCall); 3674 } 3675 3676 // There's no point returning to a timed out client. Throwing ensures scanner is closed 3677 if (rpcCall != null && EnvironmentEdgeManager.currentTime() > rpcCall.getDeadline()) { 3678 throw new TimeoutIOException("Client deadline exceeded, cannot return results"); 3679 } 3680 3681 return builder.build(); 3682 } catch (IOException e) { 3683 try { 3684 // scanner is closed here 3685 scannerClosed = true; 3686 // The scanner state might be left in a dirty state, so we will tell the Client to 3687 // fail this RPC and close the scanner while opening up another one from the start of 3688 // row that the client has last seen. 3689 closeScanner(region, scanner, scannerName, rpcCall); 3690 3691 // If it is a DoNotRetryIOException already, throw as it is. Unfortunately, DNRIOE is 3692 // used in two different semantics. 3693 // (1) The first is to close the client scanner and bubble up the exception all the way 3694 // to the application. This is preferred when the exception is really un-recoverable 3695 // (like CorruptHFileException, etc). Plain DoNotRetryIOException also falls into this 3696 // bucket usually. 3697 // (2) Second semantics is to close the current region scanner only, but continue the 3698 // client scanner by overriding the exception. This is usually UnknownScannerException, 3699 // OutOfOrderScannerNextException, etc where the region scanner has to be closed, but the 3700 // application-level ClientScanner has to continue without bubbling up the exception to 3701 // the client. See ClientScanner code to see how it deals with these special exceptions. 3702 if (e instanceof DoNotRetryIOException) { 3703 throw e; 3704 } 3705 3706 // If it is a FileNotFoundException, wrap as a 3707 // DoNotRetryIOException. This can avoid the retry in ClientScanner. 3708 if (e instanceof FileNotFoundException) { 3709 throw new DoNotRetryIOException(e); 3710 } 3711 3712 // We closed the scanner already. Instead of throwing the IOException, and client 3713 // retrying with the same scannerId only to get USE on the next RPC, we directly throw 3714 // a special exception to save an RPC. 3715 if (VersionInfoUtil.hasMinimumVersion(rpcCall.getClientVersionInfo(), 1, 4)) { 3716 // 1.4.0+ clients know how to handle 3717 throw new ScannerResetException("Scanner is closed on the server-side", e); 3718 } else { 3719 // older clients do not know about SRE. Just throw USE, which they will handle 3720 throw new UnknownScannerException("Throwing UnknownScannerException to reset the client" 3721 + " scanner state for clients older than 1.3.", e); 3722 } 3723 } catch (IOException ioe) { 3724 throw new ServiceException(ioe); 3725 } 3726 } finally { 3727 if (!scannerClosed) { 3728 // Adding resets expiration time on lease. 3729 // the closeCallBack will be set in closeScanner so here we only care about shippedCallback 3730 if (rpcCall != null) { 3731 rpcCall.setCallBack(rsh.shippedCallback); 3732 } else { 3733 // If context is null,here we call rsh.shippedCallback directly to reuse the logic in 3734 // rsh.shippedCallback to release the internal resources in rsh,and lease is also added 3735 // back to regionserver's LeaseManager in rsh.shippedCallback. 3736 runShippedCallback(rsh); 3737 } 3738 } 3739 quota.close(); 3740 } 3741 } 3742 3743 private void runShippedCallback(RegionScannerHolder rsh) throws ServiceException { 3744 assert rsh.shippedCallback != null; 3745 try { 3746 rsh.shippedCallback.run(); 3747 } catch (IOException ioe) { 3748 throw new ServiceException(ioe); 3749 } 3750 } 3751 3752 private void closeScanner(HRegion region, RegionScanner scanner, String scannerName, 3753 RpcCallContext context) throws IOException { 3754 if (region.getCoprocessorHost() != null) { 3755 if (region.getCoprocessorHost().preScannerClose(scanner)) { 3756 // bypass the actual close. 3757 return; 3758 } 3759 } 3760 RegionScannerHolder rsh = scanners.remove(scannerName); 3761 if (rsh != null) { 3762 if (context != null) { 3763 context.setCallBack(rsh.closeCallBack); 3764 } else { 3765 rsh.s.close(); 3766 } 3767 if (region.getCoprocessorHost() != null) { 3768 region.getCoprocessorHost().postScannerClose(scanner); 3769 } 3770 closedScanners.put(scannerName, scannerName); 3771 } 3772 } 3773 3774 @Override 3775 public CoprocessorServiceResponse execRegionServerService(RpcController controller, 3776 CoprocessorServiceRequest request) throws ServiceException { 3777 rpcPreCheck("execRegionServerService"); 3778 return server.execRegionServerService(controller, request); 3779 } 3780 3781 @Override 3782 public GetSpaceQuotaSnapshotsResponse getSpaceQuotaSnapshots(RpcController controller, 3783 GetSpaceQuotaSnapshotsRequest request) throws ServiceException { 3784 try { 3785 final RegionServerSpaceQuotaManager manager = server.getRegionServerSpaceQuotaManager(); 3786 final GetSpaceQuotaSnapshotsResponse.Builder builder = 3787 GetSpaceQuotaSnapshotsResponse.newBuilder(); 3788 if (manager != null) { 3789 final Map<TableName, SpaceQuotaSnapshot> snapshots = manager.copyQuotaSnapshots(); 3790 for (Entry<TableName, SpaceQuotaSnapshot> snapshot : snapshots.entrySet()) { 3791 builder.addSnapshots(TableQuotaSnapshot.newBuilder() 3792 .setTableName(ProtobufUtil.toProtoTableName(snapshot.getKey())) 3793 .setSnapshot(SpaceQuotaSnapshot.toProtoSnapshot(snapshot.getValue())).build()); 3794 } 3795 } 3796 return builder.build(); 3797 } catch (Exception e) { 3798 throw new ServiceException(e); 3799 } 3800 } 3801 3802 @Override 3803 public ClearRegionBlockCacheResponse clearRegionBlockCache(RpcController controller, 3804 ClearRegionBlockCacheRequest request) throws ServiceException { 3805 rpcPreCheck("clearRegionBlockCache"); 3806 ClearRegionBlockCacheResponse.Builder builder = ClearRegionBlockCacheResponse.newBuilder(); 3807 CacheEvictionStatsBuilder stats = CacheEvictionStats.builder(); 3808 List<HRegion> regions = getRegions(request.getRegionList(), stats); 3809 for (HRegion region : regions) { 3810 try { 3811 stats = stats.append(this.server.clearRegionBlockCache(region)); 3812 } catch (Exception e) { 3813 stats.addException(region.getRegionInfo().getRegionName(), e); 3814 } 3815 } 3816 stats.withMaxCacheSize(server.getBlockCache().map(BlockCache::getMaxSize).orElse(0L)); 3817 return builder.setStats(ProtobufUtil.toCacheEvictionStats(stats.build())).build(); 3818 } 3819 3820 private void executeOpenRegionProcedures(OpenRegionRequest request, 3821 Map<TableName, TableDescriptor> tdCache) { 3822 long masterSystemTime = request.hasMasterSystemTime() ? request.getMasterSystemTime() : -1; 3823 for (RegionOpenInfo regionOpenInfo : request.getOpenInfoList()) { 3824 RegionInfo regionInfo = ProtobufUtil.toRegionInfo(regionOpenInfo.getRegion()); 3825 TableName tableName = regionInfo.getTable(); 3826 TableDescriptor tableDesc = tdCache.get(tableName); 3827 if (tableDesc == null) { 3828 try { 3829 tableDesc = server.getTableDescriptors().get(regionInfo.getTable()); 3830 } catch (IOException e) { 3831 // Here we do not fail the whole method since we also need deal with other 3832 // procedures, and we can not ignore this one, so we still schedule a 3833 // AssignRegionHandler and it will report back to master if we still can not get the 3834 // TableDescriptor. 3835 LOG.warn("Failed to get TableDescriptor of {}, will try again in the handler", 3836 regionInfo.getTable(), e); 3837 } 3838 if (tableDesc != null) { 3839 tdCache.put(tableName, tableDesc); 3840 } 3841 } 3842 if (regionOpenInfo.getFavoredNodesCount() > 0) { 3843 server.updateRegionFavoredNodesMapping(regionInfo.getEncodedName(), 3844 regionOpenInfo.getFavoredNodesList()); 3845 } 3846 long procId = regionOpenInfo.getOpenProcId(); 3847 if (server.submitRegionProcedure(procId)) { 3848 server.getExecutorService().submit( 3849 AssignRegionHandler.create(server, regionInfo, procId, tableDesc, masterSystemTime)); 3850 } 3851 } 3852 } 3853 3854 private void executeCloseRegionProcedures(CloseRegionRequest request) { 3855 String encodedName; 3856 try { 3857 encodedName = ProtobufUtil.getRegionEncodedName(request.getRegion()); 3858 } catch (DoNotRetryIOException e) { 3859 throw new UncheckedIOException("Should not happen", e); 3860 } 3861 ServerName destination = request.hasDestinationServer() 3862 ? ProtobufUtil.toServerName(request.getDestinationServer()) 3863 : null; 3864 long procId = request.getCloseProcId(); 3865 boolean evictCache = request.getEvictCache(); 3866 if (server.submitRegionProcedure(procId)) { 3867 server.getExecutorService().submit( 3868 UnassignRegionHandler.create(server, encodedName, procId, false, destination, evictCache)); 3869 } 3870 } 3871 3872 private void executeProcedures(RemoteProcedureRequest request) { 3873 RSProcedureCallable callable; 3874 try { 3875 callable = Class.forName(request.getProcClass()).asSubclass(RSProcedureCallable.class) 3876 .getDeclaredConstructor().newInstance(); 3877 } catch (Exception e) { 3878 LOG.warn("Failed to instantiating remote procedure {}, pid={}", request.getProcClass(), 3879 request.getProcId(), e); 3880 server.remoteProcedureComplete(request.getProcId(), e); 3881 return; 3882 } 3883 callable.init(request.getProcData().toByteArray(), server); 3884 LOG.debug("Executing remote procedure {}, pid={}", callable.getClass(), request.getProcId()); 3885 server.executeProcedure(request.getProcId(), callable); 3886 } 3887 3888 @Override 3889 @QosPriority(priority = HConstants.ADMIN_QOS) 3890 public ExecuteProceduresResponse executeProcedures(RpcController controller, 3891 ExecuteProceduresRequest request) throws ServiceException { 3892 try { 3893 checkOpen(); 3894 throwOnWrongStartCode(request); 3895 server.getRegionServerCoprocessorHost().preExecuteProcedures(); 3896 if (request.getOpenRegionCount() > 0) { 3897 // Avoid reading from the TableDescritor every time(usually it will read from the file 3898 // system) 3899 Map<TableName, TableDescriptor> tdCache = new HashMap<>(); 3900 request.getOpenRegionList().forEach(req -> executeOpenRegionProcedures(req, tdCache)); 3901 } 3902 if (request.getCloseRegionCount() > 0) { 3903 request.getCloseRegionList().forEach(this::executeCloseRegionProcedures); 3904 } 3905 if (request.getProcCount() > 0) { 3906 request.getProcList().forEach(this::executeProcedures); 3907 } 3908 server.getRegionServerCoprocessorHost().postExecuteProcedures(); 3909 return ExecuteProceduresResponse.getDefaultInstance(); 3910 } catch (IOException e) { 3911 throw new ServiceException(e); 3912 } 3913 } 3914 3915 @Override 3916 public GetAllBootstrapNodesResponse getAllBootstrapNodes(RpcController controller, 3917 GetAllBootstrapNodesRequest request) throws ServiceException { 3918 GetAllBootstrapNodesResponse.Builder builder = GetAllBootstrapNodesResponse.newBuilder(); 3919 server.getBootstrapNodes() 3920 .forEachRemaining(server -> builder.addNode(ProtobufUtil.toServerName(server))); 3921 return builder.build(); 3922 } 3923 3924 private void setReloadableGuardrails(Configuration conf) { 3925 rowSizeWarnThreshold = 3926 conf.getInt(HConstants.BATCH_ROWS_THRESHOLD_NAME, HConstants.BATCH_ROWS_THRESHOLD_DEFAULT); 3927 rejectRowsWithSizeOverThreshold = 3928 conf.getBoolean(REJECT_BATCH_ROWS_OVER_THRESHOLD, DEFAULT_REJECT_BATCH_ROWS_OVER_THRESHOLD); 3929 maxScannerResultSize = conf.getLong(HConstants.HBASE_SERVER_SCANNER_MAX_RESULT_SIZE_KEY, 3930 HConstants.DEFAULT_HBASE_SERVER_SCANNER_MAX_RESULT_SIZE); 3931 } 3932 3933 @Override 3934 public void onConfigurationChange(Configuration conf) { 3935 super.onConfigurationChange(conf); 3936 setReloadableGuardrails(conf); 3937 } 3938 3939 @Override 3940 public GetCachedFilesListResponse getCachedFilesList(RpcController controller, 3941 GetCachedFilesListRequest request) throws ServiceException { 3942 GetCachedFilesListResponse.Builder responseBuilder = GetCachedFilesListResponse.newBuilder(); 3943 List<String> fullyCachedFiles = new ArrayList<>(); 3944 server.getBlockCache().flatMap(BlockCache::getFullyCachedFiles).ifPresent(fcf -> { 3945 fullyCachedFiles.addAll(fcf.keySet()); 3946 }); 3947 return responseBuilder.addAllCachedFiles(fullyCachedFiles).build(); 3948 } 3949}