001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019 020import com.google.errorprone.annotations.RestrictedApi; 021import java.io.FileNotFoundException; 022import java.io.IOException; 023import java.io.UncheckedIOException; 024import java.net.BindException; 025import java.net.InetAddress; 026import java.net.InetSocketAddress; 027import java.nio.ByteBuffer; 028import java.util.ArrayList; 029import java.util.Arrays; 030import java.util.Collections; 031import java.util.HashMap; 032import java.util.Iterator; 033import java.util.List; 034import java.util.Map; 035import java.util.Map.Entry; 036import java.util.NavigableMap; 037import java.util.Set; 038import java.util.TreeSet; 039import java.util.concurrent.ConcurrentHashMap; 040import java.util.concurrent.ConcurrentMap; 041import java.util.concurrent.TimeUnit; 042import java.util.concurrent.atomic.AtomicBoolean; 043import java.util.concurrent.atomic.AtomicLong; 044import java.util.concurrent.atomic.LongAdder; 045import org.apache.commons.lang3.mutable.MutableObject; 046import org.apache.hadoop.conf.Configuration; 047import org.apache.hadoop.fs.FileSystem; 048import org.apache.hadoop.fs.Path; 049import org.apache.hadoop.hbase.ByteBufferExtendedCell; 050import org.apache.hadoop.hbase.CacheEvictionStats; 051import org.apache.hadoop.hbase.CacheEvictionStatsBuilder; 052import org.apache.hadoop.hbase.Cell; 053import org.apache.hadoop.hbase.CellScannable; 054import org.apache.hadoop.hbase.CellScanner; 055import org.apache.hadoop.hbase.CellUtil; 056import org.apache.hadoop.hbase.DoNotRetryIOException; 057import org.apache.hadoop.hbase.DroppedSnapshotException; 058import org.apache.hadoop.hbase.HBaseIOException; 059import org.apache.hadoop.hbase.HBaseRpcServicesBase; 060import org.apache.hadoop.hbase.HConstants; 061import org.apache.hadoop.hbase.MultiActionResultTooLarge; 062import org.apache.hadoop.hbase.NotServingRegionException; 063import org.apache.hadoop.hbase.PrivateCellUtil; 064import org.apache.hadoop.hbase.RegionTooBusyException; 065import org.apache.hadoop.hbase.Server; 066import org.apache.hadoop.hbase.ServerName; 067import org.apache.hadoop.hbase.TableName; 068import org.apache.hadoop.hbase.UnknownScannerException; 069import org.apache.hadoop.hbase.client.Append; 070import org.apache.hadoop.hbase.client.CheckAndMutate; 071import org.apache.hadoop.hbase.client.CheckAndMutateResult; 072import org.apache.hadoop.hbase.client.Delete; 073import org.apache.hadoop.hbase.client.Durability; 074import org.apache.hadoop.hbase.client.Get; 075import org.apache.hadoop.hbase.client.Increment; 076import org.apache.hadoop.hbase.client.Mutation; 077import org.apache.hadoop.hbase.client.OperationWithAttributes; 078import org.apache.hadoop.hbase.client.Put; 079import org.apache.hadoop.hbase.client.RegionInfo; 080import org.apache.hadoop.hbase.client.RegionReplicaUtil; 081import org.apache.hadoop.hbase.client.Result; 082import org.apache.hadoop.hbase.client.Row; 083import org.apache.hadoop.hbase.client.Scan; 084import org.apache.hadoop.hbase.client.TableDescriptor; 085import org.apache.hadoop.hbase.client.VersionInfoUtil; 086import org.apache.hadoop.hbase.exceptions.FailedSanityCheckException; 087import org.apache.hadoop.hbase.exceptions.OutOfOrderScannerNextException; 088import org.apache.hadoop.hbase.exceptions.ScannerResetException; 089import org.apache.hadoop.hbase.exceptions.TimeoutIOException; 090import org.apache.hadoop.hbase.exceptions.UnknownProtocolException; 091import org.apache.hadoop.hbase.io.ByteBuffAllocator; 092import org.apache.hadoop.hbase.io.hfile.BlockCache; 093import org.apache.hadoop.hbase.ipc.HBaseRpcController; 094import org.apache.hadoop.hbase.ipc.PriorityFunction; 095import org.apache.hadoop.hbase.ipc.QosPriority; 096import org.apache.hadoop.hbase.ipc.RpcCall; 097import org.apache.hadoop.hbase.ipc.RpcCallContext; 098import org.apache.hadoop.hbase.ipc.RpcCallback; 099import org.apache.hadoop.hbase.ipc.RpcServer; 100import org.apache.hadoop.hbase.ipc.RpcServer.BlockingServiceAndInterface; 101import org.apache.hadoop.hbase.ipc.RpcServerFactory; 102import org.apache.hadoop.hbase.ipc.RpcServerInterface; 103import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException; 104import org.apache.hadoop.hbase.ipc.ServerRpcController; 105import org.apache.hadoop.hbase.net.Address; 106import org.apache.hadoop.hbase.procedure2.RSProcedureCallable; 107import org.apache.hadoop.hbase.quotas.ActivePolicyEnforcement; 108import org.apache.hadoop.hbase.quotas.OperationQuota; 109import org.apache.hadoop.hbase.quotas.QuotaUtil; 110import org.apache.hadoop.hbase.quotas.RegionServerRpcQuotaManager; 111import org.apache.hadoop.hbase.quotas.RegionServerSpaceQuotaManager; 112import org.apache.hadoop.hbase.quotas.SpaceQuotaSnapshot; 113import org.apache.hadoop.hbase.quotas.SpaceViolationPolicyEnforcement; 114import org.apache.hadoop.hbase.regionserver.LeaseManager.Lease; 115import org.apache.hadoop.hbase.regionserver.LeaseManager.LeaseStillHeldException; 116import org.apache.hadoop.hbase.regionserver.Region.Operation; 117import org.apache.hadoop.hbase.regionserver.ScannerContext.LimitScope; 118import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker; 119import org.apache.hadoop.hbase.regionserver.handler.AssignRegionHandler; 120import org.apache.hadoop.hbase.regionserver.handler.OpenMetaHandler; 121import org.apache.hadoop.hbase.regionserver.handler.OpenPriorityRegionHandler; 122import org.apache.hadoop.hbase.regionserver.handler.OpenRegionHandler; 123import org.apache.hadoop.hbase.regionserver.handler.UnassignRegionHandler; 124import org.apache.hadoop.hbase.replication.ReplicationUtils; 125import org.apache.hadoop.hbase.replication.regionserver.RejectReplicationRequestStateChecker; 126import org.apache.hadoop.hbase.replication.regionserver.RejectRequestsFromClientStateChecker; 127import org.apache.hadoop.hbase.security.Superusers; 128import org.apache.hadoop.hbase.security.access.Permission; 129import org.apache.hadoop.hbase.util.Bytes; 130import org.apache.hadoop.hbase.util.DNS; 131import org.apache.hadoop.hbase.util.DNS.ServerType; 132import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 133import org.apache.hadoop.hbase.util.Pair; 134import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil; 135import org.apache.hadoop.hbase.wal.WAL; 136import org.apache.hadoop.hbase.wal.WALEdit; 137import org.apache.hadoop.hbase.wal.WALKey; 138import org.apache.hadoop.hbase.wal.WALSplitUtil; 139import org.apache.hadoop.hbase.wal.WALSplitUtil.MutationReplay; 140import org.apache.hadoop.hbase.zookeeper.ZKWatcher; 141import org.apache.yetus.audience.InterfaceAudience; 142import org.slf4j.Logger; 143import org.slf4j.LoggerFactory; 144 145import org.apache.hbase.thirdparty.com.google.common.cache.Cache; 146import org.apache.hbase.thirdparty.com.google.common.cache.CacheBuilder; 147import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList; 148import org.apache.hbase.thirdparty.com.google.common.collect.Lists; 149import org.apache.hbase.thirdparty.com.google.protobuf.ByteString; 150import org.apache.hbase.thirdparty.com.google.protobuf.Message; 151import org.apache.hbase.thirdparty.com.google.protobuf.RpcController; 152import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException; 153import org.apache.hbase.thirdparty.com.google.protobuf.TextFormat; 154import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations; 155import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils; 156 157import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 158import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter; 159import org.apache.hadoop.hbase.shaded.protobuf.ResponseConverter; 160import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService; 161import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearCompactionQueuesRequest; 162import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearCompactionQueuesResponse; 163import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearRegionBlockCacheRequest; 164import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearRegionBlockCacheResponse; 165import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CloseRegionRequest; 166import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CloseRegionResponse; 167import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactRegionRequest; 168import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactRegionResponse; 169import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactionSwitchRequest; 170import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactionSwitchResponse; 171import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ExecuteProceduresRequest; 172import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ExecuteProceduresResponse; 173import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionRequest; 174import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionResponse; 175import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetOnlineRegionRequest; 176import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetOnlineRegionResponse; 177import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoRequest; 178import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoResponse; 179import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionLoadRequest; 180import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionLoadResponse; 181import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetServerInfoRequest; 182import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetServerInfoResponse; 183import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetStoreFileRequest; 184import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetStoreFileResponse; 185import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionRequest; 186import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionRequest.RegionOpenInfo; 187import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionResponse; 188import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionResponse.RegionOpeningState; 189import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.RemoteProcedureRequest; 190import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ReplicateWALEntryRequest; 191import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ReplicateWALEntryResponse; 192import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.RollWALWriterRequest; 193import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.RollWALWriterResponse; 194import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.StopServerRequest; 195import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.StopServerResponse; 196import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateFavoredNodesRequest; 197import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateFavoredNodesResponse; 198import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WALEntry; 199import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WarmupRegionRequest; 200import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WarmupRegionResponse; 201import org.apache.hadoop.hbase.shaded.protobuf.generated.BootstrapNodeProtos.BootstrapNodeService; 202import org.apache.hadoop.hbase.shaded.protobuf.generated.BootstrapNodeProtos.GetAllBootstrapNodesRequest; 203import org.apache.hadoop.hbase.shaded.protobuf.generated.BootstrapNodeProtos.GetAllBootstrapNodesResponse; 204import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos; 205import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.Action; 206import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.BulkLoadHFileRequest; 207import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.BulkLoadHFileRequest.FamilyPath; 208import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.BulkLoadHFileResponse; 209import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CleanupBulkLoadRequest; 210import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CleanupBulkLoadResponse; 211import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService; 212import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.Condition; 213import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceRequest; 214import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceResponse; 215import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.GetRequest; 216import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.GetResponse; 217import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MultiRegionLoadStats; 218import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MultiRequest; 219import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MultiResponse; 220import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateRequest; 221import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateResponse; 222import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutationProto; 223import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutationProto.MutationType; 224import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.PrepareBulkLoadRequest; 225import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.PrepareBulkLoadResponse; 226import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.RegionAction; 227import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.RegionActionResult; 228import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ResultOrException; 229import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanRequest; 230import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanResponse; 231import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos; 232import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos.RegionLoad; 233import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.NameBytesPair; 234import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.NameInt64Pair; 235import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier; 236import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType; 237import org.apache.hadoop.hbase.shaded.protobuf.generated.MapReduceProtos.ScanMetrics; 238import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsRequest; 239import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsResponse; 240import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsResponse.TableQuotaSnapshot; 241import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.ClientMetaService; 242import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.BulkLoadDescriptor; 243import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.CompactionDescriptor; 244import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.FlushDescriptor; 245import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.RegionEventDescriptor; 246 247/** 248 * Implements the regionserver RPC services. 249 */ 250@InterfaceAudience.Private 251public class RSRpcServices extends HBaseRpcServicesBase<HRegionServer> 252 implements ClientService.BlockingInterface, BootstrapNodeService.BlockingInterface { 253 254 private static final Logger LOG = LoggerFactory.getLogger(RSRpcServices.class); 255 256 /** RPC scheduler to use for the region server. */ 257 public static final String REGION_SERVER_RPC_SCHEDULER_FACTORY_CLASS = 258 "hbase.region.server.rpc.scheduler.factory.class"; 259 260 /** 261 * Minimum allowable time limit delta (in milliseconds) that can be enforced during scans. This 262 * configuration exists to prevent the scenario where a time limit is specified to be so 263 * restrictive that the time limit is reached immediately (before any cells are scanned). 264 */ 265 private static final String REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA = 266 "hbase.region.server.rpc.minimum.scan.time.limit.delta"; 267 /** 268 * Default value of {@link RSRpcServices#REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA} 269 */ 270 static final long DEFAULT_REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA = 10; 271 272 /** 273 * Whether to reject rows with size > threshold defined by 274 * {@link HConstants#BATCH_ROWS_THRESHOLD_NAME} 275 */ 276 private static final String REJECT_BATCH_ROWS_OVER_THRESHOLD = 277 "hbase.rpc.rows.size.threshold.reject"; 278 279 /** 280 * Default value of config {@link RSRpcServices#REJECT_BATCH_ROWS_OVER_THRESHOLD} 281 */ 282 private static final boolean DEFAULT_REJECT_BATCH_ROWS_OVER_THRESHOLD = false; 283 284 // Request counter. (Includes requests that are not serviced by regions.) 285 // Count only once for requests with multiple actions like multi/caching-scan/replayBatch 286 final LongAdder requestCount = new LongAdder(); 287 288 // Request counter for rpc get 289 final LongAdder rpcGetRequestCount = new LongAdder(); 290 291 // Request counter for rpc scan 292 final LongAdder rpcScanRequestCount = new LongAdder(); 293 294 // Request counter for scans that might end up in full scans 295 final LongAdder rpcFullScanRequestCount = new LongAdder(); 296 297 // Request counter for rpc multi 298 final LongAdder rpcMultiRequestCount = new LongAdder(); 299 300 // Request counter for rpc mutate 301 final LongAdder rpcMutateRequestCount = new LongAdder(); 302 303 private final long maxScannerResultSize; 304 305 private ScannerIdGenerator scannerIdGenerator; 306 private final ConcurrentMap<String, RegionScannerHolder> scanners = new ConcurrentHashMap<>(); 307 // Hold the name of a closed scanner for a while. This is used to keep compatible for old clients 308 // which may send next or close request to a region scanner which has already been exhausted. The 309 // entries will be removed automatically after scannerLeaseTimeoutPeriod. 310 private final Cache<String, String> closedScanners; 311 /** 312 * The lease timeout period for client scanners (milliseconds). 313 */ 314 private final int scannerLeaseTimeoutPeriod; 315 316 /** 317 * The RPC timeout period (milliseconds) 318 */ 319 private final int rpcTimeout; 320 321 /** 322 * The minimum allowable delta to use for the scan limit 323 */ 324 private final long minimumScanTimeLimitDelta; 325 326 /** 327 * Row size threshold for multi requests above which a warning is logged 328 */ 329 private final int rowSizeWarnThreshold; 330 /* 331 * Whether we should reject requests with very high no of rows i.e. beyond threshold defined by 332 * rowSizeWarnThreshold 333 */ 334 private final boolean rejectRowsWithSizeOverThreshold; 335 336 final AtomicBoolean clearCompactionQueues = new AtomicBoolean(false); 337 338 /** 339 * Services launched in RSRpcServices. By default they are on but you can use the below booleans 340 * to selectively enable/disable either Admin or Client Service (Rare is the case where you would 341 * ever turn off one or the other). 342 */ 343 public static final String REGIONSERVER_ADMIN_SERVICE_CONFIG = 344 "hbase.regionserver.admin.executorService"; 345 public static final String REGIONSERVER_CLIENT_SERVICE_CONFIG = 346 "hbase.regionserver.client.executorService"; 347 public static final String REGIONSERVER_CLIENT_META_SERVICE_CONFIG = 348 "hbase.regionserver.client.meta.executorService"; 349 350 /** 351 * An Rpc callback for closing a RegionScanner. 352 */ 353 private static final class RegionScannerCloseCallBack implements RpcCallback { 354 355 private final RegionScanner scanner; 356 357 public RegionScannerCloseCallBack(RegionScanner scanner) { 358 this.scanner = scanner; 359 } 360 361 @Override 362 public void run() throws IOException { 363 this.scanner.close(); 364 } 365 } 366 367 /** 368 * An Rpc callback for doing shipped() call on a RegionScanner. 369 */ 370 private class RegionScannerShippedCallBack implements RpcCallback { 371 private final String scannerName; 372 private final Shipper shipper; 373 private final Lease lease; 374 375 public RegionScannerShippedCallBack(String scannerName, Shipper shipper, Lease lease) { 376 this.scannerName = scannerName; 377 this.shipper = shipper; 378 this.lease = lease; 379 } 380 381 @Override 382 public void run() throws IOException { 383 this.shipper.shipped(); 384 // We're done. On way out re-add the above removed lease. The lease was temp removed for this 385 // Rpc call and we are at end of the call now. Time to add it back. 386 if (scanners.containsKey(scannerName)) { 387 if (lease != null) { 388 server.getLeaseManager().addLease(lease); 389 } 390 } 391 } 392 } 393 394 /** 395 * An RpcCallBack that creates a list of scanners that needs to perform callBack operation on 396 * completion of multiGets. 397 */ 398 static class RegionScannersCloseCallBack implements RpcCallback { 399 private final List<RegionScanner> scanners = new ArrayList<>(); 400 401 public void addScanner(RegionScanner scanner) { 402 this.scanners.add(scanner); 403 } 404 405 @Override 406 public void run() { 407 for (RegionScanner scanner : scanners) { 408 try { 409 scanner.close(); 410 } catch (IOException e) { 411 LOG.error("Exception while closing the scanner " + scanner, e); 412 } 413 } 414 } 415 } 416 417 /** 418 * Holder class which holds the RegionScanner, nextCallSeq and RpcCallbacks together. 419 */ 420 static final class RegionScannerHolder { 421 private final AtomicLong nextCallSeq = new AtomicLong(0); 422 private final RegionScanner s; 423 private final HRegion r; 424 private final RpcCallback closeCallBack; 425 private final RpcCallback shippedCallback; 426 private byte[] rowOfLastPartialResult; 427 private boolean needCursor; 428 private boolean fullRegionScan; 429 private final String clientIPAndPort; 430 private final String userName; 431 432 RegionScannerHolder(RegionScanner s, HRegion r, RpcCallback closeCallBack, 433 RpcCallback shippedCallback, boolean needCursor, boolean fullRegionScan, 434 String clientIPAndPort, String userName) { 435 this.s = s; 436 this.r = r; 437 this.closeCallBack = closeCallBack; 438 this.shippedCallback = shippedCallback; 439 this.needCursor = needCursor; 440 this.fullRegionScan = fullRegionScan; 441 this.clientIPAndPort = clientIPAndPort; 442 this.userName = userName; 443 } 444 445 long getNextCallSeq() { 446 return nextCallSeq.get(); 447 } 448 449 boolean incNextCallSeq(long currentSeq) { 450 // Use CAS to prevent multiple scan request running on the same scanner. 451 return nextCallSeq.compareAndSet(currentSeq, currentSeq + 1); 452 } 453 454 // Should be called only when we need to print lease expired messages otherwise 455 // cache the String once made. 456 @Override 457 public String toString() { 458 return "clientIPAndPort=" + this.clientIPAndPort + ", userName=" + this.userName 459 + ", regionInfo=" + this.r.getRegionInfo().getRegionNameAsString(); 460 } 461 } 462 463 /** 464 * Instantiated as a scanner lease. If the lease times out, the scanner is closed 465 */ 466 private class ScannerListener implements LeaseListener { 467 private final String scannerName; 468 469 ScannerListener(final String n) { 470 this.scannerName = n; 471 } 472 473 @Override 474 public void leaseExpired() { 475 RegionScannerHolder rsh = scanners.remove(this.scannerName); 476 if (rsh == null) { 477 LOG.warn("Scanner lease {} expired but no outstanding scanner", this.scannerName); 478 return; 479 } 480 LOG.info("Scanner lease {} expired {}", this.scannerName, rsh); 481 server.getMetrics().incrScannerLeaseExpired(); 482 RegionScanner s = rsh.s; 483 HRegion region = null; 484 try { 485 region = server.getRegion(s.getRegionInfo().getRegionName()); 486 if (region != null && region.getCoprocessorHost() != null) { 487 region.getCoprocessorHost().preScannerClose(s); 488 } 489 } catch (IOException e) { 490 LOG.error("Closing scanner {} {}", this.scannerName, rsh, e); 491 } finally { 492 try { 493 s.close(); 494 if (region != null && region.getCoprocessorHost() != null) { 495 region.getCoprocessorHost().postScannerClose(s); 496 } 497 } catch (IOException e) { 498 LOG.error("Closing scanner {} {}", this.scannerName, rsh, e); 499 } 500 } 501 } 502 } 503 504 private static ResultOrException getResultOrException(final ClientProtos.Result r, 505 final int index) { 506 return getResultOrException(ResponseConverter.buildActionResult(r), index); 507 } 508 509 private static ResultOrException getResultOrException(final Exception e, final int index) { 510 return getResultOrException(ResponseConverter.buildActionResult(e), index); 511 } 512 513 private static ResultOrException getResultOrException(final ResultOrException.Builder builder, 514 final int index) { 515 return builder.setIndex(index).build(); 516 } 517 518 /** 519 * Checks for the following pre-checks in order: 520 * <ol> 521 * <li>RegionServer is running</li> 522 * <li>If authorization is enabled, then RPC caller has ADMIN permissions</li> 523 * </ol> 524 * @param requestName name of rpc request. Used in reporting failures to provide context. 525 * @throws ServiceException If any of the above listed pre-check fails. 526 */ 527 private void rpcPreCheck(String requestName) throws ServiceException { 528 try { 529 checkOpen(); 530 requirePermission(requestName, Permission.Action.ADMIN); 531 } catch (IOException ioe) { 532 throw new ServiceException(ioe); 533 } 534 } 535 536 private boolean isClientCellBlockSupport(RpcCallContext context) { 537 return context != null && context.isClientCellBlockSupported(); 538 } 539 540 private void addResult(final MutateResponse.Builder builder, final Result result, 541 final HBaseRpcController rpcc, boolean clientCellBlockSupported) { 542 if (result == null) return; 543 if (clientCellBlockSupported) { 544 builder.setResult(ProtobufUtil.toResultNoData(result)); 545 rpcc.setCellScanner(result.cellScanner()); 546 } else { 547 ClientProtos.Result pbr = ProtobufUtil.toResult(result); 548 builder.setResult(pbr); 549 } 550 } 551 552 private void addResults(ScanResponse.Builder builder, List<Result> results, 553 HBaseRpcController controller, boolean isDefaultRegion, boolean clientCellBlockSupported) { 554 builder.setStale(!isDefaultRegion); 555 if (results.isEmpty()) { 556 return; 557 } 558 if (clientCellBlockSupported) { 559 for (Result res : results) { 560 builder.addCellsPerResult(res.size()); 561 builder.addPartialFlagPerResult(res.mayHaveMoreCellsInRow()); 562 } 563 controller.setCellScanner(CellUtil.createCellScanner(results)); 564 } else { 565 for (Result res : results) { 566 ClientProtos.Result pbr = ProtobufUtil.toResult(res); 567 builder.addResults(pbr); 568 } 569 } 570 } 571 572 private CheckAndMutateResult checkAndMutate(HRegion region, List<ClientProtos.Action> actions, 573 CellScanner cellScanner, Condition condition, long nonceGroup, 574 ActivePolicyEnforcement spaceQuotaEnforcement) throws IOException { 575 int countOfCompleteMutation = 0; 576 try { 577 if (!region.getRegionInfo().isMetaRegion()) { 578 server.getMemStoreFlusher().reclaimMemStoreMemory(); 579 } 580 List<Mutation> mutations = new ArrayList<>(); 581 long nonce = HConstants.NO_NONCE; 582 for (ClientProtos.Action action : actions) { 583 if (action.hasGet()) { 584 throw new DoNotRetryIOException( 585 "Atomic put and/or delete only, not a Get=" + action.getGet()); 586 } 587 MutationProto mutation = action.getMutation(); 588 MutationType type = mutation.getMutateType(); 589 switch (type) { 590 case PUT: 591 Put put = ProtobufUtil.toPut(mutation, cellScanner); 592 ++countOfCompleteMutation; 593 checkCellSizeLimit(region, put); 594 spaceQuotaEnforcement.getPolicyEnforcement(region).check(put); 595 mutations.add(put); 596 break; 597 case DELETE: 598 Delete del = ProtobufUtil.toDelete(mutation, cellScanner); 599 ++countOfCompleteMutation; 600 spaceQuotaEnforcement.getPolicyEnforcement(region).check(del); 601 mutations.add(del); 602 break; 603 case INCREMENT: 604 Increment increment = ProtobufUtil.toIncrement(mutation, cellScanner); 605 nonce = mutation.hasNonce() ? mutation.getNonce() : HConstants.NO_NONCE; 606 ++countOfCompleteMutation; 607 checkCellSizeLimit(region, increment); 608 spaceQuotaEnforcement.getPolicyEnforcement(region).check(increment); 609 mutations.add(increment); 610 break; 611 case APPEND: 612 Append append = ProtobufUtil.toAppend(mutation, cellScanner); 613 nonce = mutation.hasNonce() ? mutation.getNonce() : HConstants.NO_NONCE; 614 ++countOfCompleteMutation; 615 checkCellSizeLimit(region, append); 616 spaceQuotaEnforcement.getPolicyEnforcement(region).check(append); 617 mutations.add(append); 618 break; 619 default: 620 throw new DoNotRetryIOException("invalid mutation type : " + type); 621 } 622 } 623 624 if (mutations.size() == 0) { 625 return new CheckAndMutateResult(true, null); 626 } else { 627 CheckAndMutate checkAndMutate = ProtobufUtil.toCheckAndMutate(condition, mutations); 628 CheckAndMutateResult result = null; 629 if (region.getCoprocessorHost() != null) { 630 result = region.getCoprocessorHost().preCheckAndMutate(checkAndMutate); 631 } 632 if (result == null) { 633 result = region.checkAndMutate(checkAndMutate, nonceGroup, nonce); 634 if (region.getCoprocessorHost() != null) { 635 result = region.getCoprocessorHost().postCheckAndMutate(checkAndMutate, result); 636 } 637 } 638 return result; 639 } 640 } finally { 641 // Currently, the checkAndMutate isn't supported by batch so it won't mess up the cell scanner 642 // even if the malformed cells are not skipped. 643 for (int i = countOfCompleteMutation; i < actions.size(); ++i) { 644 skipCellsForMutation(actions.get(i), cellScanner); 645 } 646 } 647 } 648 649 /** 650 * Execute an append mutation. 651 * @return result to return to client if default operation should be bypassed as indicated by 652 * RegionObserver, null otherwise 653 */ 654 private Result append(final HRegion region, final OperationQuota quota, 655 final MutationProto mutation, final CellScanner cellScanner, long nonceGroup, 656 ActivePolicyEnforcement spaceQuota) throws IOException { 657 long before = EnvironmentEdgeManager.currentTime(); 658 Append append = ProtobufUtil.toAppend(mutation, cellScanner); 659 checkCellSizeLimit(region, append); 660 spaceQuota.getPolicyEnforcement(region).check(append); 661 quota.addMutation(append); 662 long nonce = mutation.hasNonce() ? mutation.getNonce() : HConstants.NO_NONCE; 663 Result r = region.append(append, nonceGroup, nonce); 664 if (server.getMetrics() != null) { 665 server.getMetrics().updateAppend(region.getTableDescriptor().getTableName(), 666 EnvironmentEdgeManager.currentTime() - before); 667 } 668 return r == null ? Result.EMPTY_RESULT : r; 669 } 670 671 /** 672 * Execute an increment mutation. 673 */ 674 private Result increment(final HRegion region, final OperationQuota quota, 675 final MutationProto mutation, final CellScanner cells, long nonceGroup, 676 ActivePolicyEnforcement spaceQuota) throws IOException { 677 long before = EnvironmentEdgeManager.currentTime(); 678 Increment increment = ProtobufUtil.toIncrement(mutation, cells); 679 checkCellSizeLimit(region, increment); 680 spaceQuota.getPolicyEnforcement(region).check(increment); 681 quota.addMutation(increment); 682 long nonce = mutation.hasNonce() ? mutation.getNonce() : HConstants.NO_NONCE; 683 Result r = region.increment(increment, nonceGroup, nonce); 684 final MetricsRegionServer metricsRegionServer = server.getMetrics(); 685 if (metricsRegionServer != null) { 686 metricsRegionServer.updateIncrement(region.getTableDescriptor().getTableName(), 687 EnvironmentEdgeManager.currentTime() - before); 688 } 689 return r == null ? Result.EMPTY_RESULT : r; 690 } 691 692 /** 693 * Run through the regionMutation <code>rm</code> and per Mutation, do the work, and then when 694 * done, add an instance of a {@link ResultOrException} that corresponds to each Mutation. 695 * @param cellsToReturn Could be null. May be allocated in this method. This is what this method 696 * returns as a 'result'. 697 * @param closeCallBack the callback to be used with multigets 698 * @param context the current RpcCallContext 699 * @return Return the <code>cellScanner</code> passed 700 */ 701 private List<CellScannable> doNonAtomicRegionMutation(final HRegion region, 702 final OperationQuota quota, final RegionAction actions, final CellScanner cellScanner, 703 final RegionActionResult.Builder builder, List<CellScannable> cellsToReturn, long nonceGroup, 704 final RegionScannersCloseCallBack closeCallBack, RpcCallContext context, 705 ActivePolicyEnforcement spaceQuotaEnforcement) { 706 // Gather up CONTIGUOUS Puts and Deletes in this mutations List. Idea is that rather than do 707 // one at a time, we instead pass them in batch. Be aware that the corresponding 708 // ResultOrException instance that matches each Put or Delete is then added down in the 709 // doNonAtomicBatchOp call. We should be staying aligned though the Put and Delete are 710 // deferred/batched 711 List<ClientProtos.Action> mutations = null; 712 long maxQuotaResultSize = Math.min(maxScannerResultSize, quota.getReadAvailable()); 713 IOException sizeIOE = null; 714 Object lastBlock = null; 715 ClientProtos.ResultOrException.Builder resultOrExceptionBuilder = 716 ResultOrException.newBuilder(); 717 boolean hasResultOrException = false; 718 for (ClientProtos.Action action : actions.getActionList()) { 719 hasResultOrException = false; 720 resultOrExceptionBuilder.clear(); 721 try { 722 Result r = null; 723 724 if ( 725 context != null && context.isRetryImmediatelySupported() 726 && (context.getResponseCellSize() > maxQuotaResultSize 727 || context.getResponseBlockSize() + context.getResponseExceptionSize() 728 > maxQuotaResultSize) 729 ) { 730 731 // We're storing the exception since the exception and reason string won't 732 // change after the response size limit is reached. 733 if (sizeIOE == null) { 734 // We don't need the stack un-winding do don't throw the exception. 735 // Throwing will kill the JVM's JIT. 736 // 737 // Instead just create the exception and then store it. 738 sizeIOE = new MultiActionResultTooLarge("Max size exceeded" + " CellSize: " 739 + context.getResponseCellSize() + " BlockSize: " + context.getResponseBlockSize()); 740 741 // Only report the exception once since there's only one request that 742 // caused the exception. Otherwise this number will dominate the exceptions count. 743 rpcServer.getMetrics().exception(sizeIOE); 744 } 745 746 // Now that there's an exception is known to be created 747 // use it for the response. 748 // 749 // This will create a copy in the builder. 750 NameBytesPair pair = ResponseConverter.buildException(sizeIOE); 751 resultOrExceptionBuilder.setException(pair); 752 context.incrementResponseExceptionSize(pair.getSerializedSize()); 753 resultOrExceptionBuilder.setIndex(action.getIndex()); 754 builder.addResultOrException(resultOrExceptionBuilder.build()); 755 skipCellsForMutation(action, cellScanner); 756 continue; 757 } 758 if (action.hasGet()) { 759 long before = EnvironmentEdgeManager.currentTime(); 760 ClientProtos.Get pbGet = action.getGet(); 761 // An asynchbase client, https://github.com/OpenTSDB/asynchbase, starts by trying to do 762 // a get closest before. Throwing the UnknownProtocolException signals it that it needs 763 // to switch and do hbase2 protocol (HBase servers do not tell clients what versions 764 // they are; its a problem for non-native clients like asynchbase. HBASE-20225. 765 if (pbGet.hasClosestRowBefore() && pbGet.getClosestRowBefore()) { 766 throw new UnknownProtocolException("Is this a pre-hbase-1.0.0 or asynchbase client? " 767 + "Client is invoking getClosestRowBefore removed in hbase-2.0.0 replaced by " 768 + "reverse Scan."); 769 } 770 try { 771 Get get = ProtobufUtil.toGet(pbGet); 772 if (context != null) { 773 r = get(get, (region), closeCallBack, context); 774 } else { 775 r = region.get(get); 776 } 777 } finally { 778 final MetricsRegionServer metricsRegionServer = server.getMetrics(); 779 if (metricsRegionServer != null) { 780 metricsRegionServer.updateGet(region.getTableDescriptor().getTableName(), 781 EnvironmentEdgeManager.currentTime() - before); 782 } 783 } 784 } else if (action.hasServiceCall()) { 785 hasResultOrException = true; 786 Message result = execServiceOnRegion(region, action.getServiceCall()); 787 ClientProtos.CoprocessorServiceResult.Builder serviceResultBuilder = 788 ClientProtos.CoprocessorServiceResult.newBuilder(); 789 resultOrExceptionBuilder.setServiceResult(serviceResultBuilder 790 .setValue(serviceResultBuilder.getValueBuilder().setName(result.getClass().getName()) 791 // TODO: Copy!!! 792 .setValue(UnsafeByteOperations.unsafeWrap(result.toByteArray())))); 793 } else if (action.hasMutation()) { 794 MutationType type = action.getMutation().getMutateType(); 795 if ( 796 type != MutationType.PUT && type != MutationType.DELETE && mutations != null 797 && !mutations.isEmpty() 798 ) { 799 // Flush out any Puts or Deletes already collected. 800 doNonAtomicBatchOp(builder, region, quota, mutations, cellScanner, 801 spaceQuotaEnforcement); 802 mutations.clear(); 803 } 804 switch (type) { 805 case APPEND: 806 r = append(region, quota, action.getMutation(), cellScanner, nonceGroup, 807 spaceQuotaEnforcement); 808 break; 809 case INCREMENT: 810 r = increment(region, quota, action.getMutation(), cellScanner, nonceGroup, 811 spaceQuotaEnforcement); 812 break; 813 case PUT: 814 case DELETE: 815 // Collect the individual mutations and apply in a batch 816 if (mutations == null) { 817 mutations = new ArrayList<>(actions.getActionCount()); 818 } 819 mutations.add(action); 820 break; 821 default: 822 throw new DoNotRetryIOException("Unsupported mutate type: " + type.name()); 823 } 824 } else { 825 throw new HBaseIOException("Unexpected Action type"); 826 } 827 if (r != null) { 828 ClientProtos.Result pbResult = null; 829 if (isClientCellBlockSupport(context)) { 830 pbResult = ProtobufUtil.toResultNoData(r); 831 // Hard to guess the size here. Just make a rough guess. 832 if (cellsToReturn == null) { 833 cellsToReturn = new ArrayList<>(); 834 } 835 cellsToReturn.add(r); 836 } else { 837 pbResult = ProtobufUtil.toResult(r); 838 } 839 lastBlock = addSize(context, r, lastBlock); 840 hasResultOrException = true; 841 resultOrExceptionBuilder.setResult(pbResult); 842 } 843 // Could get to here and there was no result and no exception. Presumes we added 844 // a Put or Delete to the collecting Mutations List for adding later. In this 845 // case the corresponding ResultOrException instance for the Put or Delete will be added 846 // down in the doNonAtomicBatchOp method call rather than up here. 847 } catch (IOException ie) { 848 rpcServer.getMetrics().exception(ie); 849 hasResultOrException = true; 850 NameBytesPair pair = ResponseConverter.buildException(ie); 851 resultOrExceptionBuilder.setException(pair); 852 context.incrementResponseExceptionSize(pair.getSerializedSize()); 853 } 854 if (hasResultOrException) { 855 // Propagate index. 856 resultOrExceptionBuilder.setIndex(action.getIndex()); 857 builder.addResultOrException(resultOrExceptionBuilder.build()); 858 } 859 } 860 // Finish up any outstanding mutations 861 if (!CollectionUtils.isEmpty(mutations)) { 862 doNonAtomicBatchOp(builder, region, quota, mutations, cellScanner, spaceQuotaEnforcement); 863 } 864 return cellsToReturn; 865 } 866 867 private void checkCellSizeLimit(final HRegion r, final Mutation m) throws IOException { 868 if (r.maxCellSize > 0) { 869 CellScanner cells = m.cellScanner(); 870 while (cells.advance()) { 871 int size = PrivateCellUtil.estimatedSerializedSizeOf(cells.current()); 872 if (size > r.maxCellSize) { 873 String msg = "Cell[" + cells.current() + "] with size " + size + " exceeds limit of " 874 + r.maxCellSize + " bytes"; 875 LOG.debug(msg); 876 throw new DoNotRetryIOException(msg); 877 } 878 } 879 } 880 } 881 882 private void doAtomicBatchOp(final RegionActionResult.Builder builder, final HRegion region, 883 final OperationQuota quota, final List<ClientProtos.Action> mutations, final CellScanner cells, 884 long nonceGroup, ActivePolicyEnforcement spaceQuotaEnforcement) throws IOException { 885 // Just throw the exception. The exception will be caught and then added to region-level 886 // exception for RegionAction. Leaving the null to action result is ok since the null 887 // result is viewed as failure by hbase client. And the region-lever exception will be used 888 // to replaced the null result. see AsyncRequestFutureImpl#receiveMultiAction and 889 // AsyncBatchRpcRetryingCaller#onComplete for more details. 890 doBatchOp(builder, region, quota, mutations, cells, nonceGroup, spaceQuotaEnforcement, true); 891 } 892 893 private void doNonAtomicBatchOp(final RegionActionResult.Builder builder, final HRegion region, 894 final OperationQuota quota, final List<ClientProtos.Action> mutations, final CellScanner cells, 895 ActivePolicyEnforcement spaceQuotaEnforcement) { 896 try { 897 doBatchOp(builder, region, quota, mutations, cells, HConstants.NO_NONCE, 898 spaceQuotaEnforcement, false); 899 } catch (IOException e) { 900 // Set the exception for each action. The mutations in same RegionAction are group to 901 // different batch and then be processed individually. Hence, we don't set the region-level 902 // exception here for whole RegionAction. 903 for (Action mutation : mutations) { 904 builder.addResultOrException(getResultOrException(e, mutation.getIndex())); 905 } 906 } 907 } 908 909 /** 910 * Execute a list of mutations. nnn 911 */ 912 private void doBatchOp(final RegionActionResult.Builder builder, final HRegion region, 913 final OperationQuota quota, final List<ClientProtos.Action> mutations, final CellScanner cells, 914 long nonceGroup, ActivePolicyEnforcement spaceQuotaEnforcement, boolean atomic) 915 throws IOException { 916 Mutation[] mArray = new Mutation[mutations.size()]; 917 long before = EnvironmentEdgeManager.currentTime(); 918 boolean batchContainsPuts = false, batchContainsDelete = false; 919 try { 920 /** 921 * HBASE-17924 mutationActionMap is a map to map the relation between mutations and actions 922 * since mutation array may have been reoredered.In order to return the right result or 923 * exception to the corresponding actions, We need to know which action is the mutation belong 924 * to. We can't sort ClientProtos.Action array, since they are bonded to cellscanners. 925 */ 926 Map<Mutation, ClientProtos.Action> mutationActionMap = new HashMap<>(); 927 int i = 0; 928 long nonce = HConstants.NO_NONCE; 929 for (ClientProtos.Action action : mutations) { 930 if (action.hasGet()) { 931 throw new DoNotRetryIOException( 932 "Atomic put and/or delete only, not a Get=" + action.getGet()); 933 } 934 MutationProto m = action.getMutation(); 935 Mutation mutation; 936 switch (m.getMutateType()) { 937 case PUT: 938 mutation = ProtobufUtil.toPut(m, cells); 939 batchContainsPuts = true; 940 break; 941 942 case DELETE: 943 mutation = ProtobufUtil.toDelete(m, cells); 944 batchContainsDelete = true; 945 break; 946 947 case INCREMENT: 948 mutation = ProtobufUtil.toIncrement(m, cells); 949 nonce = m.hasNonce() ? m.getNonce() : HConstants.NO_NONCE; 950 break; 951 952 case APPEND: 953 mutation = ProtobufUtil.toAppend(m, cells); 954 nonce = m.hasNonce() ? m.getNonce() : HConstants.NO_NONCE; 955 break; 956 957 default: 958 throw new DoNotRetryIOException("Invalid mutation type : " + m.getMutateType()); 959 } 960 mutationActionMap.put(mutation, action); 961 mArray[i++] = mutation; 962 checkCellSizeLimit(region, mutation); 963 // Check if a space quota disallows this mutation 964 spaceQuotaEnforcement.getPolicyEnforcement(region).check(mutation); 965 quota.addMutation(mutation); 966 } 967 968 if (!region.getRegionInfo().isMetaRegion()) { 969 server.getMemStoreFlusher().reclaimMemStoreMemory(); 970 } 971 972 // HBASE-17924 973 // Sort to improve lock efficiency for non-atomic batch of operations. If atomic 974 // order is preserved as its expected from the client 975 if (!atomic) { 976 Arrays.sort(mArray, (v1, v2) -> Row.COMPARATOR.compare(v1, v2)); 977 } 978 979 OperationStatus[] codes = region.batchMutate(mArray, atomic, nonceGroup, nonce); 980 981 // When atomic is true, it indicates that the mutateRow API or the batch API with 982 // RowMutations is called. In this case, we need to merge the results of the 983 // Increment/Append operations if the mutations include those operations, and set the merged 984 // result to the first element of the ResultOrException list 985 if (atomic) { 986 List<ResultOrException> resultOrExceptions = new ArrayList<>(); 987 List<Result> results = new ArrayList<>(); 988 for (i = 0; i < codes.length; i++) { 989 if (codes[i].getResult() != null) { 990 results.add(codes[i].getResult()); 991 } 992 if (i != 0) { 993 resultOrExceptions 994 .add(getResultOrException(ClientProtos.Result.getDefaultInstance(), i)); 995 } 996 } 997 998 if (results.isEmpty()) { 999 builder.addResultOrException( 1000 getResultOrException(ClientProtos.Result.getDefaultInstance(), 0)); 1001 } else { 1002 // Merge the results of the Increment/Append operations 1003 List<Cell> cellList = new ArrayList<>(); 1004 for (Result result : results) { 1005 if (result.rawCells() != null) { 1006 cellList.addAll(Arrays.asList(result.rawCells())); 1007 } 1008 } 1009 Result result = Result.create(cellList); 1010 1011 // Set the merged result of the Increment/Append operations to the first element of the 1012 // ResultOrException list 1013 builder.addResultOrException(getResultOrException(ProtobufUtil.toResult(result), 0)); 1014 } 1015 1016 builder.addAllResultOrException(resultOrExceptions); 1017 return; 1018 } 1019 1020 for (i = 0; i < codes.length; i++) { 1021 Mutation currentMutation = mArray[i]; 1022 ClientProtos.Action currentAction = mutationActionMap.get(currentMutation); 1023 int index = currentAction.hasIndex() ? currentAction.getIndex() : i; 1024 Exception e; 1025 switch (codes[i].getOperationStatusCode()) { 1026 case BAD_FAMILY: 1027 e = new NoSuchColumnFamilyException(codes[i].getExceptionMsg()); 1028 builder.addResultOrException(getResultOrException(e, index)); 1029 break; 1030 1031 case SANITY_CHECK_FAILURE: 1032 e = new FailedSanityCheckException(codes[i].getExceptionMsg()); 1033 builder.addResultOrException(getResultOrException(e, index)); 1034 break; 1035 1036 default: 1037 e = new DoNotRetryIOException(codes[i].getExceptionMsg()); 1038 builder.addResultOrException(getResultOrException(e, index)); 1039 break; 1040 1041 case SUCCESS: 1042 builder.addResultOrException( 1043 getResultOrException(ClientProtos.Result.getDefaultInstance(), index)); 1044 break; 1045 1046 case STORE_TOO_BUSY: 1047 e = new RegionTooBusyException(codes[i].getExceptionMsg()); 1048 builder.addResultOrException(getResultOrException(e, index)); 1049 break; 1050 } 1051 } 1052 } finally { 1053 int processedMutationIndex = 0; 1054 for (Action mutation : mutations) { 1055 // The non-null mArray[i] means the cell scanner has been read. 1056 if (mArray[processedMutationIndex++] == null) { 1057 skipCellsForMutation(mutation, cells); 1058 } 1059 } 1060 updateMutationMetrics(region, before, batchContainsPuts, batchContainsDelete); 1061 } 1062 } 1063 1064 private void updateMutationMetrics(HRegion region, long starttime, boolean batchContainsPuts, 1065 boolean batchContainsDelete) { 1066 final MetricsRegionServer metricsRegionServer = server.getMetrics(); 1067 if (metricsRegionServer != null) { 1068 long after = EnvironmentEdgeManager.currentTime(); 1069 if (batchContainsPuts) { 1070 metricsRegionServer.updatePutBatch(region.getTableDescriptor().getTableName(), 1071 after - starttime); 1072 } 1073 if (batchContainsDelete) { 1074 metricsRegionServer.updateDeleteBatch(region.getTableDescriptor().getTableName(), 1075 after - starttime); 1076 } 1077 } 1078 } 1079 1080 /** 1081 * Execute a list of Put/Delete mutations. The function returns OperationStatus instead of 1082 * constructing MultiResponse to save a possible loop if caller doesn't need MultiResponse. 1083 * @return an array of OperationStatus which internally contains the OperationStatusCode and the 1084 * exceptionMessage if any 1085 * @deprecated Since 3.0.0, will be removed in 4.0.0. We do not use this method for replaying 1086 * edits for secondary replicas any more, see 1087 * {@link #replicateToReplica(RpcController, ReplicateWALEntryRequest)}. 1088 */ 1089 @Deprecated 1090 private OperationStatus[] doReplayBatchOp(final HRegion region, 1091 final List<MutationReplay> mutations, long replaySeqId) throws IOException { 1092 long before = EnvironmentEdgeManager.currentTime(); 1093 boolean batchContainsPuts = false, batchContainsDelete = false; 1094 try { 1095 for (Iterator<MutationReplay> it = mutations.iterator(); it.hasNext();) { 1096 MutationReplay m = it.next(); 1097 1098 if (m.getType() == MutationType.PUT) { 1099 batchContainsPuts = true; 1100 } else { 1101 batchContainsDelete = true; 1102 } 1103 1104 NavigableMap<byte[], List<Cell>> map = m.mutation.getFamilyCellMap(); 1105 List<Cell> metaCells = map.get(WALEdit.METAFAMILY); 1106 if (metaCells != null && !metaCells.isEmpty()) { 1107 for (Cell metaCell : metaCells) { 1108 CompactionDescriptor compactionDesc = WALEdit.getCompaction(metaCell); 1109 boolean isDefaultReplica = RegionReplicaUtil.isDefaultReplica(region.getRegionInfo()); 1110 HRegion hRegion = region; 1111 if (compactionDesc != null) { 1112 // replay the compaction. Remove the files from stores only if we are the primary 1113 // region replica (thus own the files) 1114 hRegion.replayWALCompactionMarker(compactionDesc, !isDefaultReplica, isDefaultReplica, 1115 replaySeqId); 1116 continue; 1117 } 1118 FlushDescriptor flushDesc = WALEdit.getFlushDescriptor(metaCell); 1119 if (flushDesc != null && !isDefaultReplica) { 1120 hRegion.replayWALFlushMarker(flushDesc, replaySeqId); 1121 continue; 1122 } 1123 RegionEventDescriptor regionEvent = WALEdit.getRegionEventDescriptor(metaCell); 1124 if (regionEvent != null && !isDefaultReplica) { 1125 hRegion.replayWALRegionEventMarker(regionEvent); 1126 continue; 1127 } 1128 BulkLoadDescriptor bulkLoadEvent = WALEdit.getBulkLoadDescriptor(metaCell); 1129 if (bulkLoadEvent != null) { 1130 hRegion.replayWALBulkLoadEventMarker(bulkLoadEvent); 1131 continue; 1132 } 1133 } 1134 it.remove(); 1135 } 1136 } 1137 requestCount.increment(); 1138 if (!region.getRegionInfo().isMetaRegion()) { 1139 server.getMemStoreFlusher().reclaimMemStoreMemory(); 1140 } 1141 return region.batchReplay(mutations.toArray(new MutationReplay[mutations.size()]), 1142 replaySeqId); 1143 } finally { 1144 updateMutationMetrics(region, before, batchContainsPuts, batchContainsDelete); 1145 } 1146 } 1147 1148 private void closeAllScanners() { 1149 // Close any outstanding scanners. Means they'll get an UnknownScanner 1150 // exception next time they come in. 1151 for (Map.Entry<String, RegionScannerHolder> e : scanners.entrySet()) { 1152 try { 1153 e.getValue().s.close(); 1154 } catch (IOException ioe) { 1155 LOG.warn("Closing scanner " + e.getKey(), ioe); 1156 } 1157 } 1158 } 1159 1160 // Directly invoked only for testing 1161 public RSRpcServices(final HRegionServer rs) throws IOException { 1162 super(rs, rs.getProcessName()); 1163 final Configuration conf = rs.getConfiguration(); 1164 rowSizeWarnThreshold = 1165 conf.getInt(HConstants.BATCH_ROWS_THRESHOLD_NAME, HConstants.BATCH_ROWS_THRESHOLD_DEFAULT); 1166 rejectRowsWithSizeOverThreshold = 1167 conf.getBoolean(REJECT_BATCH_ROWS_OVER_THRESHOLD, DEFAULT_REJECT_BATCH_ROWS_OVER_THRESHOLD); 1168 scannerLeaseTimeoutPeriod = conf.getInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, 1169 HConstants.DEFAULT_HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD); 1170 maxScannerResultSize = conf.getLong(HConstants.HBASE_SERVER_SCANNER_MAX_RESULT_SIZE_KEY, 1171 HConstants.DEFAULT_HBASE_SERVER_SCANNER_MAX_RESULT_SIZE); 1172 rpcTimeout = 1173 conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY, HConstants.DEFAULT_HBASE_RPC_TIMEOUT); 1174 minimumScanTimeLimitDelta = conf.getLong(REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA, 1175 DEFAULT_REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA); 1176 rpcServer.setNamedQueueRecorder(rs.getNamedQueueRecorder()); 1177 closedScanners = CacheBuilder.newBuilder() 1178 .expireAfterAccess(scannerLeaseTimeoutPeriod, TimeUnit.MILLISECONDS).build(); 1179 } 1180 1181 @Override 1182 protected boolean defaultReservoirEnabled() { 1183 return true; 1184 } 1185 1186 @Override 1187 protected ServerType getDNSServerType() { 1188 return DNS.ServerType.REGIONSERVER; 1189 } 1190 1191 @Override 1192 protected String getHostname(Configuration conf, String defaultHostname) { 1193 return conf.get("hbase.regionserver.ipc.address", defaultHostname); 1194 } 1195 1196 @Override 1197 protected String getPortConfigName() { 1198 return HConstants.REGIONSERVER_PORT; 1199 } 1200 1201 @Override 1202 protected int getDefaultPort() { 1203 return HConstants.DEFAULT_REGIONSERVER_PORT; 1204 } 1205 1206 @Override 1207 protected Class<?> getRpcSchedulerFactoryClass(Configuration conf) { 1208 return conf.getClass(REGION_SERVER_RPC_SCHEDULER_FACTORY_CLASS, 1209 SimpleRpcSchedulerFactory.class); 1210 } 1211 1212 protected RpcServerInterface createRpcServer(final Server server, 1213 final RpcSchedulerFactory rpcSchedulerFactory, final InetSocketAddress bindAddress, 1214 final String name) throws IOException { 1215 final Configuration conf = server.getConfiguration(); 1216 boolean reservoirEnabled = conf.getBoolean(ByteBuffAllocator.ALLOCATOR_POOL_ENABLED_KEY, true); 1217 try { 1218 return RpcServerFactory.createRpcServer(server, name, getServices(), bindAddress, // use final 1219 // bindAddress 1220 // for this 1221 // server. 1222 conf, rpcSchedulerFactory.create(conf, this, server), reservoirEnabled); 1223 } catch (BindException be) { 1224 throw new IOException(be.getMessage() + ". To switch ports use the '" 1225 + HConstants.REGIONSERVER_PORT + "' configuration property.", 1226 be.getCause() != null ? be.getCause() : be); 1227 } 1228 } 1229 1230 protected Class<?> getRpcSchedulerFactoryClass() { 1231 final Configuration conf = server.getConfiguration(); 1232 return conf.getClass(REGION_SERVER_RPC_SCHEDULER_FACTORY_CLASS, 1233 SimpleRpcSchedulerFactory.class); 1234 } 1235 1236 protected PriorityFunction createPriority() { 1237 return new RSAnnotationReadingPriorityFunction(this); 1238 } 1239 1240 public int getScannersCount() { 1241 return scanners.size(); 1242 } 1243 1244 /** Returns The outstanding RegionScanner for <code>scannerId</code> or null if none found. */ 1245 RegionScanner getScanner(long scannerId) { 1246 RegionScannerHolder rsh = getRegionScannerHolder(scannerId); 1247 return rsh == null ? null : rsh.s; 1248 } 1249 1250 /** Returns The associated RegionScannerHolder for <code>scannerId</code> or null. */ 1251 private RegionScannerHolder getRegionScannerHolder(long scannerId) { 1252 return scanners.get(toScannerName(scannerId)); 1253 } 1254 1255 public String getScanDetailsWithId(long scannerId) { 1256 RegionScanner scanner = getScanner(scannerId); 1257 if (scanner == null) { 1258 return null; 1259 } 1260 StringBuilder builder = new StringBuilder(); 1261 builder.append("table: ").append(scanner.getRegionInfo().getTable().getNameAsString()); 1262 builder.append(" region: ").append(scanner.getRegionInfo().getRegionNameAsString()); 1263 builder.append(" operation_id: ").append(scanner.getOperationId()); 1264 return builder.toString(); 1265 } 1266 1267 public String getScanDetailsWithRequest(ScanRequest request) { 1268 try { 1269 if (!request.hasRegion()) { 1270 return null; 1271 } 1272 Region region = getRegion(request.getRegion()); 1273 StringBuilder builder = new StringBuilder(); 1274 builder.append("table: ").append(region.getRegionInfo().getTable().getNameAsString()); 1275 builder.append(" region: ").append(region.getRegionInfo().getRegionNameAsString()); 1276 for (NameBytesPair pair : request.getScan().getAttributeList()) { 1277 if (OperationWithAttributes.ID_ATRIBUTE.equals(pair.getName())) { 1278 builder.append(" operation_id: ").append(Bytes.toString(pair.getValue().toByteArray())); 1279 break; 1280 } 1281 } 1282 return builder.toString(); 1283 } catch (IOException ignored) { 1284 return null; 1285 } 1286 } 1287 1288 /** 1289 * Get the vtime associated with the scanner. Currently the vtime is the number of "next" calls. 1290 */ 1291 long getScannerVirtualTime(long scannerId) { 1292 RegionScannerHolder rsh = getRegionScannerHolder(scannerId); 1293 return rsh == null ? 0L : rsh.getNextCallSeq(); 1294 } 1295 1296 /** 1297 * Method to account for the size of retained cells and retained data blocks. 1298 * @param context rpc call context 1299 * @param r result to add size. 1300 * @param lastBlock last block to check whether we need to add the block size in context. 1301 * @return an object that represents the last referenced block from this response. 1302 */ 1303 Object addSize(RpcCallContext context, Result r, Object lastBlock) { 1304 if (context != null && r != null && !r.isEmpty()) { 1305 for (Cell c : r.rawCells()) { 1306 context.incrementResponseCellSize(PrivateCellUtil.estimatedSerializedSizeOf(c)); 1307 1308 // Since byte buffers can point all kinds of crazy places it's harder to keep track 1309 // of which blocks are kept alive by what byte buffer. 1310 // So we make a guess. 1311 if (c instanceof ByteBufferExtendedCell) { 1312 ByteBufferExtendedCell bbCell = (ByteBufferExtendedCell) c; 1313 ByteBuffer bb = bbCell.getValueByteBuffer(); 1314 if (bb != lastBlock) { 1315 context.incrementResponseBlockSize(bb.capacity()); 1316 lastBlock = bb; 1317 } 1318 } else { 1319 // We're using the last block being the same as the current block as 1320 // a proxy for pointing to a new block. This won't be exact. 1321 // If there are multiple gets that bounce back and forth 1322 // Then it's possible that this will over count the size of 1323 // referenced blocks. However it's better to over count and 1324 // use two rpcs than to OOME the regionserver. 1325 byte[] valueArray = c.getValueArray(); 1326 if (valueArray != lastBlock) { 1327 context.incrementResponseBlockSize(valueArray.length); 1328 lastBlock = valueArray; 1329 } 1330 } 1331 1332 } 1333 } 1334 return lastBlock; 1335 } 1336 1337 /** Returns Remote client's ip and port else null if can't be determined. */ 1338 @RestrictedApi(explanation = "Should only be called in TestRSRpcServices and RSRpcServices", 1339 link = "", allowedOnPath = ".*(TestRSRpcServices|RSRpcServices).java") 1340 static String getRemoteClientIpAndPort() { 1341 RpcCall rpcCall = RpcServer.getCurrentCall().orElse(null); 1342 if (rpcCall == null) { 1343 return HConstants.EMPTY_STRING; 1344 } 1345 InetAddress address = rpcCall.getRemoteAddress(); 1346 if (address == null) { 1347 return HConstants.EMPTY_STRING; 1348 } 1349 // Be careful here with InetAddress. Do InetAddress#getHostAddress. It will not do a name 1350 // resolution. Just use the IP. It is generally a smaller amount of info to keep around while 1351 // scanning than a hostname anyways. 1352 return Address.fromParts(address.getHostAddress(), rpcCall.getRemotePort()).toString(); 1353 } 1354 1355 /** Returns Remote client's username. */ 1356 @RestrictedApi(explanation = "Should only be called in TestRSRpcServices and RSRpcServices", 1357 link = "", allowedOnPath = ".*(TestRSRpcServices|RSRpcServices).java") 1358 static String getUserName() { 1359 RpcCall rpcCall = RpcServer.getCurrentCall().orElse(null); 1360 if (rpcCall == null) { 1361 return HConstants.EMPTY_STRING; 1362 } 1363 return rpcCall.getRequestUserName().orElse(HConstants.EMPTY_STRING); 1364 } 1365 1366 private RegionScannerHolder addScanner(String scannerName, RegionScanner s, Shipper shipper, 1367 HRegion r, boolean needCursor, boolean fullRegionScan) throws LeaseStillHeldException { 1368 Lease lease = server.getLeaseManager().createLease(scannerName, this.scannerLeaseTimeoutPeriod, 1369 new ScannerListener(scannerName)); 1370 RpcCallback shippedCallback = new RegionScannerShippedCallBack(scannerName, shipper, lease); 1371 RpcCallback closeCallback = 1372 s instanceof RpcCallback ? (RpcCallback) s : new RegionScannerCloseCallBack(s); 1373 RegionScannerHolder rsh = new RegionScannerHolder(s, r, closeCallback, shippedCallback, 1374 needCursor, fullRegionScan, getRemoteClientIpAndPort(), getUserName()); 1375 RegionScannerHolder existing = scanners.putIfAbsent(scannerName, rsh); 1376 assert existing == null : "scannerId must be unique within regionserver's whole lifecycle! " 1377 + scannerName + ", " + existing; 1378 return rsh; 1379 } 1380 1381 private boolean isFullRegionScan(Scan scan, HRegion region) { 1382 // If the scan start row equals or less than the start key of the region 1383 // and stop row greater than equals end key (if stop row present) 1384 // or if the stop row is empty 1385 // account this as a full region scan 1386 if ( 1387 Bytes.compareTo(scan.getStartRow(), region.getRegionInfo().getStartKey()) <= 0 1388 && (Bytes.compareTo(scan.getStopRow(), region.getRegionInfo().getEndKey()) >= 0 1389 && !Bytes.equals(region.getRegionInfo().getEndKey(), HConstants.EMPTY_END_ROW) 1390 || Bytes.equals(scan.getStopRow(), HConstants.EMPTY_END_ROW)) 1391 ) { 1392 return true; 1393 } 1394 return false; 1395 } 1396 1397 /** 1398 * Find the HRegion based on a region specifier 1399 * @param regionSpecifier the region specifier 1400 * @return the corresponding region 1401 * @throws IOException if the specifier is not null, but failed to find the region 1402 */ 1403 public HRegion getRegion(final RegionSpecifier regionSpecifier) throws IOException { 1404 return server.getRegion(regionSpecifier.getValue().toByteArray()); 1405 } 1406 1407 /** 1408 * Find the List of HRegions based on a list of region specifiers 1409 * @param regionSpecifiers the list of region specifiers 1410 * @return the corresponding list of regions 1411 * @throws IOException if any of the specifiers is not null, but failed to find the region 1412 */ 1413 private List<HRegion> getRegions(final List<RegionSpecifier> regionSpecifiers, 1414 final CacheEvictionStatsBuilder stats) { 1415 List<HRegion> regions = Lists.newArrayListWithCapacity(regionSpecifiers.size()); 1416 for (RegionSpecifier regionSpecifier : regionSpecifiers) { 1417 try { 1418 regions.add(server.getRegion(regionSpecifier.getValue().toByteArray())); 1419 } catch (NotServingRegionException e) { 1420 stats.addException(regionSpecifier.getValue().toByteArray(), e); 1421 } 1422 } 1423 return regions; 1424 } 1425 1426 PriorityFunction getPriority() { 1427 return priority; 1428 } 1429 1430 private RegionServerRpcQuotaManager getRpcQuotaManager() { 1431 return server.getRegionServerRpcQuotaManager(); 1432 } 1433 1434 private RegionServerSpaceQuotaManager getSpaceQuotaManager() { 1435 return server.getRegionServerSpaceQuotaManager(); 1436 } 1437 1438 void start(ZKWatcher zkWatcher) { 1439 this.scannerIdGenerator = new ScannerIdGenerator(this.server.getServerName()); 1440 internalStart(zkWatcher); 1441 } 1442 1443 void stop() { 1444 closeAllScanners(); 1445 internalStop(); 1446 } 1447 1448 /** 1449 * Called to verify that this server is up and running. 1450 */ 1451 // TODO : Rename this and HMaster#checkInitialized to isRunning() (or a better name). 1452 protected void checkOpen() throws IOException { 1453 if (server.isAborted()) { 1454 throw new RegionServerAbortedException("Server " + server.getServerName() + " aborting"); 1455 } 1456 if (server.isStopped()) { 1457 throw new RegionServerStoppedException("Server " + server.getServerName() + " stopping"); 1458 } 1459 if (!server.isDataFileSystemOk()) { 1460 throw new RegionServerStoppedException("File system not available"); 1461 } 1462 if (!server.isOnline()) { 1463 throw new ServerNotRunningYetException( 1464 "Server " + server.getServerName() + " is not running yet"); 1465 } 1466 } 1467 1468 /** 1469 * By default, put up an Admin and a Client Service. Set booleans 1470 * <code>hbase.regionserver.admin.executorService</code> and 1471 * <code>hbase.regionserver.client.executorService</code> if you want to enable/disable services. 1472 * Default is that both are enabled. 1473 * @return immutable list of blocking services and the security info classes that this server 1474 * supports 1475 */ 1476 protected List<BlockingServiceAndInterface> getServices() { 1477 boolean admin = getConfiguration().getBoolean(REGIONSERVER_ADMIN_SERVICE_CONFIG, true); 1478 boolean client = getConfiguration().getBoolean(REGIONSERVER_CLIENT_SERVICE_CONFIG, true); 1479 boolean clientMeta = 1480 getConfiguration().getBoolean(REGIONSERVER_CLIENT_META_SERVICE_CONFIG, true); 1481 List<BlockingServiceAndInterface> bssi = new ArrayList<>(); 1482 if (client) { 1483 bssi.add(new BlockingServiceAndInterface(ClientService.newReflectiveBlockingService(this), 1484 ClientService.BlockingInterface.class)); 1485 } 1486 if (admin) { 1487 bssi.add(new BlockingServiceAndInterface(AdminService.newReflectiveBlockingService(this), 1488 AdminService.BlockingInterface.class)); 1489 } 1490 if (clientMeta) { 1491 bssi.add(new BlockingServiceAndInterface(ClientMetaService.newReflectiveBlockingService(this), 1492 ClientMetaService.BlockingInterface.class)); 1493 } 1494 return new ImmutableList.Builder<BlockingServiceAndInterface>().addAll(bssi).build(); 1495 } 1496 1497 /** 1498 * Close a region on the region server. 1499 * @param controller the RPC controller 1500 * @param request the request n 1501 */ 1502 @Override 1503 @QosPriority(priority = HConstants.ADMIN_QOS) 1504 public CloseRegionResponse closeRegion(final RpcController controller, 1505 final CloseRegionRequest request) throws ServiceException { 1506 final ServerName sn = (request.hasDestinationServer() 1507 ? ProtobufUtil.toServerName(request.getDestinationServer()) 1508 : null); 1509 1510 try { 1511 checkOpen(); 1512 throwOnWrongStartCode(request); 1513 final String encodedRegionName = ProtobufUtil.getRegionEncodedName(request.getRegion()); 1514 1515 requestCount.increment(); 1516 if (sn == null) { 1517 LOG.info("Close " + encodedRegionName + " without moving"); 1518 } else { 1519 LOG.info("Close " + encodedRegionName + ", moving to " + sn); 1520 } 1521 boolean closed = server.closeRegion(encodedRegionName, false, sn); 1522 CloseRegionResponse.Builder builder = CloseRegionResponse.newBuilder().setClosed(closed); 1523 return builder.build(); 1524 } catch (IOException ie) { 1525 throw new ServiceException(ie); 1526 } 1527 } 1528 1529 /** 1530 * Compact a region on the region server. 1531 * @param controller the RPC controller 1532 * @param request the request n 1533 */ 1534 @Override 1535 @QosPriority(priority = HConstants.ADMIN_QOS) 1536 public CompactRegionResponse compactRegion(final RpcController controller, 1537 final CompactRegionRequest request) throws ServiceException { 1538 try { 1539 checkOpen(); 1540 requestCount.increment(); 1541 HRegion region = getRegion(request.getRegion()); 1542 // Quota support is enabled, the requesting user is not system/super user 1543 // and a quota policy is enforced that disables compactions. 1544 if ( 1545 QuotaUtil.isQuotaEnabled(getConfiguration()) 1546 && !Superusers.isSuperUser(RpcServer.getRequestUser().orElse(null)) 1547 && this.server.getRegionServerSpaceQuotaManager() 1548 .areCompactionsDisabled(region.getTableDescriptor().getTableName()) 1549 ) { 1550 throw new DoNotRetryIOException( 1551 "Compactions on this region are " + "disabled due to a space quota violation."); 1552 } 1553 region.startRegionOperation(Operation.COMPACT_REGION); 1554 LOG.info("Compacting " + region.getRegionInfo().getRegionNameAsString()); 1555 boolean major = request.hasMajor() && request.getMajor(); 1556 if (request.hasFamily()) { 1557 byte[] family = request.getFamily().toByteArray(); 1558 String log = "User-triggered " + (major ? "major " : "") + "compaction for region " 1559 + region.getRegionInfo().getRegionNameAsString() + " and family " 1560 + Bytes.toString(family); 1561 LOG.trace(log); 1562 region.requestCompaction(family, log, Store.PRIORITY_USER, major, 1563 CompactionLifeCycleTracker.DUMMY); 1564 } else { 1565 String log = "User-triggered " + (major ? "major " : "") + "compaction for region " 1566 + region.getRegionInfo().getRegionNameAsString(); 1567 LOG.trace(log); 1568 region.requestCompaction(log, Store.PRIORITY_USER, major, CompactionLifeCycleTracker.DUMMY); 1569 } 1570 return CompactRegionResponse.newBuilder().build(); 1571 } catch (IOException ie) { 1572 throw new ServiceException(ie); 1573 } 1574 } 1575 1576 @Override 1577 public CompactionSwitchResponse compactionSwitch(RpcController controller, 1578 CompactionSwitchRequest request) throws ServiceException { 1579 rpcPreCheck("compactionSwitch"); 1580 final CompactSplit compactSplitThread = server.getCompactSplitThread(); 1581 requestCount.increment(); 1582 boolean prevState = compactSplitThread.isCompactionsEnabled(); 1583 CompactionSwitchResponse response = 1584 CompactionSwitchResponse.newBuilder().setPrevState(prevState).build(); 1585 if (prevState == request.getEnabled()) { 1586 // passed in requested state is same as current state. No action required 1587 return response; 1588 } 1589 compactSplitThread.switchCompaction(request.getEnabled()); 1590 return response; 1591 } 1592 1593 /** 1594 * Flush a region on the region server. 1595 * @param controller the RPC controller 1596 * @param request the request n 1597 */ 1598 @Override 1599 @QosPriority(priority = HConstants.ADMIN_QOS) 1600 public FlushRegionResponse flushRegion(final RpcController controller, 1601 final FlushRegionRequest request) throws ServiceException { 1602 try { 1603 checkOpen(); 1604 requestCount.increment(); 1605 HRegion region = getRegion(request.getRegion()); 1606 LOG.info("Flushing " + region.getRegionInfo().getRegionNameAsString()); 1607 boolean shouldFlush = true; 1608 if (request.hasIfOlderThanTs()) { 1609 shouldFlush = region.getEarliestFlushTimeForAllStores() < request.getIfOlderThanTs(); 1610 } 1611 FlushRegionResponse.Builder builder = FlushRegionResponse.newBuilder(); 1612 if (shouldFlush) { 1613 boolean writeFlushWalMarker = 1614 request.hasWriteFlushWalMarker() ? request.getWriteFlushWalMarker() : false; 1615 // Go behind the curtain so we can manage writing of the flush WAL marker 1616 HRegion.FlushResultImpl flushResult = null; 1617 if (request.hasFamily()) { 1618 List families = new ArrayList(); 1619 families.add(request.getFamily().toByteArray()); 1620 flushResult = 1621 region.flushcache(families, writeFlushWalMarker, FlushLifeCycleTracker.DUMMY); 1622 } else { 1623 flushResult = region.flushcache(true, writeFlushWalMarker, FlushLifeCycleTracker.DUMMY); 1624 } 1625 boolean compactionNeeded = flushResult.isCompactionNeeded(); 1626 if (compactionNeeded) { 1627 server.getCompactSplitThread().requestSystemCompaction(region, 1628 "Compaction through user triggered flush"); 1629 } 1630 builder.setFlushed(flushResult.isFlushSucceeded()); 1631 builder.setWroteFlushWalMarker(flushResult.wroteFlushWalMarker); 1632 } 1633 builder.setLastFlushTime(region.getEarliestFlushTimeForAllStores()); 1634 return builder.build(); 1635 } catch (DroppedSnapshotException ex) { 1636 // Cache flush can fail in a few places. If it fails in a critical 1637 // section, we get a DroppedSnapshotException and a replay of wal 1638 // is required. Currently the only way to do this is a restart of 1639 // the server. 1640 server.abort("Replay of WAL required. Forcing server shutdown", ex); 1641 throw new ServiceException(ex); 1642 } catch (IOException ie) { 1643 throw new ServiceException(ie); 1644 } 1645 } 1646 1647 @Override 1648 @QosPriority(priority = HConstants.ADMIN_QOS) 1649 public GetOnlineRegionResponse getOnlineRegion(final RpcController controller, 1650 final GetOnlineRegionRequest request) throws ServiceException { 1651 try { 1652 checkOpen(); 1653 requestCount.increment(); 1654 Map<String, HRegion> onlineRegions = server.getOnlineRegions(); 1655 List<RegionInfo> list = new ArrayList<>(onlineRegions.size()); 1656 for (HRegion region : onlineRegions.values()) { 1657 list.add(region.getRegionInfo()); 1658 } 1659 list.sort(RegionInfo.COMPARATOR); 1660 return ResponseConverter.buildGetOnlineRegionResponse(list); 1661 } catch (IOException ie) { 1662 throw new ServiceException(ie); 1663 } 1664 } 1665 1666 // Master implementation of this Admin Service differs given it is not 1667 // able to supply detail only known to RegionServer. See note on 1668 // MasterRpcServers#getRegionInfo. 1669 @Override 1670 @QosPriority(priority = HConstants.ADMIN_QOS) 1671 public GetRegionInfoResponse getRegionInfo(final RpcController controller, 1672 final GetRegionInfoRequest request) throws ServiceException { 1673 try { 1674 checkOpen(); 1675 requestCount.increment(); 1676 HRegion region = getRegion(request.getRegion()); 1677 RegionInfo info = region.getRegionInfo(); 1678 byte[] bestSplitRow; 1679 if (request.hasBestSplitRow() && request.getBestSplitRow()) { 1680 bestSplitRow = region.checkSplit(true).orElse(null); 1681 // when all table data are in memstore, bestSplitRow = null 1682 // try to flush region first 1683 if (bestSplitRow == null) { 1684 region.flush(true); 1685 bestSplitRow = region.checkSplit(true).orElse(null); 1686 } 1687 } else { 1688 bestSplitRow = null; 1689 } 1690 GetRegionInfoResponse.Builder builder = GetRegionInfoResponse.newBuilder(); 1691 builder.setRegionInfo(ProtobufUtil.toRegionInfo(info)); 1692 if (request.hasCompactionState() && request.getCompactionState()) { 1693 builder.setCompactionState(ProtobufUtil.createCompactionState(region.getCompactionState())); 1694 } 1695 builder.setSplittable(region.isSplittable()); 1696 builder.setMergeable(region.isMergeable()); 1697 if (request.hasBestSplitRow() && request.getBestSplitRow() && bestSplitRow != null) { 1698 builder.setBestSplitRow(UnsafeByteOperations.unsafeWrap(bestSplitRow)); 1699 } 1700 return builder.build(); 1701 } catch (IOException ie) { 1702 throw new ServiceException(ie); 1703 } 1704 } 1705 1706 @Override 1707 @QosPriority(priority = HConstants.ADMIN_QOS) 1708 public GetRegionLoadResponse getRegionLoad(RpcController controller, GetRegionLoadRequest request) 1709 throws ServiceException { 1710 1711 List<HRegion> regions; 1712 if (request.hasTableName()) { 1713 TableName tableName = ProtobufUtil.toTableName(request.getTableName()); 1714 regions = server.getRegions(tableName); 1715 } else { 1716 regions = server.getRegions(); 1717 } 1718 List<RegionLoad> rLoads = new ArrayList<>(regions.size()); 1719 RegionLoad.Builder regionLoadBuilder = ClusterStatusProtos.RegionLoad.newBuilder(); 1720 RegionSpecifier.Builder regionSpecifier = RegionSpecifier.newBuilder(); 1721 1722 try { 1723 for (HRegion region : regions) { 1724 rLoads.add(server.createRegionLoad(region, regionLoadBuilder, regionSpecifier)); 1725 } 1726 } catch (IOException e) { 1727 throw new ServiceException(e); 1728 } 1729 GetRegionLoadResponse.Builder builder = GetRegionLoadResponse.newBuilder(); 1730 builder.addAllRegionLoads(rLoads); 1731 return builder.build(); 1732 } 1733 1734 @Override 1735 @QosPriority(priority = HConstants.ADMIN_QOS) 1736 public ClearCompactionQueuesResponse clearCompactionQueues(RpcController controller, 1737 ClearCompactionQueuesRequest request) throws ServiceException { 1738 LOG.debug("Client=" + RpcServer.getRequestUserName().orElse(null) + "/" 1739 + RpcServer.getRemoteAddress().orElse(null) + " clear compactions queue"); 1740 ClearCompactionQueuesResponse.Builder respBuilder = ClearCompactionQueuesResponse.newBuilder(); 1741 requestCount.increment(); 1742 if (clearCompactionQueues.compareAndSet(false, true)) { 1743 final CompactSplit compactSplitThread = server.getCompactSplitThread(); 1744 try { 1745 checkOpen(); 1746 server.getRegionServerCoprocessorHost().preClearCompactionQueues(); 1747 for (String queueName : request.getQueueNameList()) { 1748 LOG.debug("clear " + queueName + " compaction queue"); 1749 switch (queueName) { 1750 case "long": 1751 compactSplitThread.clearLongCompactionsQueue(); 1752 break; 1753 case "short": 1754 compactSplitThread.clearShortCompactionsQueue(); 1755 break; 1756 default: 1757 LOG.warn("Unknown queue name " + queueName); 1758 throw new IOException("Unknown queue name " + queueName); 1759 } 1760 } 1761 server.getRegionServerCoprocessorHost().postClearCompactionQueues(); 1762 } catch (IOException ie) { 1763 throw new ServiceException(ie); 1764 } finally { 1765 clearCompactionQueues.set(false); 1766 } 1767 } else { 1768 LOG.warn("Clear compactions queue is executing by other admin."); 1769 } 1770 return respBuilder.build(); 1771 } 1772 1773 /** 1774 * Get some information of the region server. 1775 * @param controller the RPC controller 1776 * @param request the request n 1777 */ 1778 @Override 1779 @QosPriority(priority = HConstants.ADMIN_QOS) 1780 public GetServerInfoResponse getServerInfo(final RpcController controller, 1781 final GetServerInfoRequest request) throws ServiceException { 1782 try { 1783 checkOpen(); 1784 } catch (IOException ie) { 1785 throw new ServiceException(ie); 1786 } 1787 requestCount.increment(); 1788 int infoPort = server.getInfoServer() != null ? server.getInfoServer().getPort() : -1; 1789 return ResponseConverter.buildGetServerInfoResponse(server.getServerName(), infoPort); 1790 } 1791 1792 @Override 1793 @QosPriority(priority = HConstants.ADMIN_QOS) 1794 public GetStoreFileResponse getStoreFile(final RpcController controller, 1795 final GetStoreFileRequest request) throws ServiceException { 1796 try { 1797 checkOpen(); 1798 HRegion region = getRegion(request.getRegion()); 1799 requestCount.increment(); 1800 Set<byte[]> columnFamilies; 1801 if (request.getFamilyCount() == 0) { 1802 columnFamilies = region.getTableDescriptor().getColumnFamilyNames(); 1803 } else { 1804 columnFamilies = new TreeSet<>(Bytes.BYTES_RAWCOMPARATOR); 1805 for (ByteString cf : request.getFamilyList()) { 1806 columnFamilies.add(cf.toByteArray()); 1807 } 1808 } 1809 int nCF = columnFamilies.size(); 1810 List<String> fileList = region.getStoreFileList(columnFamilies.toArray(new byte[nCF][])); 1811 GetStoreFileResponse.Builder builder = GetStoreFileResponse.newBuilder(); 1812 builder.addAllStoreFile(fileList); 1813 return builder.build(); 1814 } catch (IOException ie) { 1815 throw new ServiceException(ie); 1816 } 1817 } 1818 1819 private void throwOnWrongStartCode(OpenRegionRequest request) throws ServiceException { 1820 if (!request.hasServerStartCode()) { 1821 LOG.warn("OpenRegionRequest for {} does not have a start code", request.getOpenInfoList()); 1822 return; 1823 } 1824 throwOnWrongStartCode(request.getServerStartCode()); 1825 } 1826 1827 private void throwOnWrongStartCode(CloseRegionRequest request) throws ServiceException { 1828 if (!request.hasServerStartCode()) { 1829 LOG.warn("CloseRegionRequest for {} does not have a start code", request.getRegion()); 1830 return; 1831 } 1832 throwOnWrongStartCode(request.getServerStartCode()); 1833 } 1834 1835 private void throwOnWrongStartCode(long serverStartCode) throws ServiceException { 1836 // check that we are the same server that this RPC is intended for. 1837 if (server.getServerName().getStartcode() != serverStartCode) { 1838 throw new ServiceException(new DoNotRetryIOException( 1839 "This RPC was intended for a " + "different server with startCode: " + serverStartCode 1840 + ", this server is: " + server.getServerName())); 1841 } 1842 } 1843 1844 private void throwOnWrongStartCode(ExecuteProceduresRequest req) throws ServiceException { 1845 if (req.getOpenRegionCount() > 0) { 1846 for (OpenRegionRequest openReq : req.getOpenRegionList()) { 1847 throwOnWrongStartCode(openReq); 1848 } 1849 } 1850 if (req.getCloseRegionCount() > 0) { 1851 for (CloseRegionRequest closeReq : req.getCloseRegionList()) { 1852 throwOnWrongStartCode(closeReq); 1853 } 1854 } 1855 } 1856 1857 /** 1858 * Open asynchronously a region or a set of regions on the region server. The opening is 1859 * coordinated by ZooKeeper, and this method requires the znode to be created before being called. 1860 * As a consequence, this method should be called only from the master. 1861 * <p> 1862 * Different manages states for the region are: 1863 * </p> 1864 * <ul> 1865 * <li>region not opened: the region opening will start asynchronously.</li> 1866 * <li>a close is already in progress: this is considered as an error.</li> 1867 * <li>an open is already in progress: this new open request will be ignored. This is important 1868 * because the Master can do multiple requests if it crashes.</li> 1869 * <li>the region is already opened: this new open request will be ignored.</li> 1870 * </ul> 1871 * <p> 1872 * Bulk assign: If there are more than 1 region to open, it will be considered as a bulk assign. 1873 * For a single region opening, errors are sent through a ServiceException. For bulk assign, 1874 * errors are put in the response as FAILED_OPENING. 1875 * </p> 1876 * @param controller the RPC controller 1877 * @param request the request n 1878 */ 1879 @Override 1880 @QosPriority(priority = HConstants.ADMIN_QOS) 1881 public OpenRegionResponse openRegion(final RpcController controller, 1882 final OpenRegionRequest request) throws ServiceException { 1883 requestCount.increment(); 1884 throwOnWrongStartCode(request); 1885 1886 OpenRegionResponse.Builder builder = OpenRegionResponse.newBuilder(); 1887 final int regionCount = request.getOpenInfoCount(); 1888 final Map<TableName, TableDescriptor> htds = new HashMap<>(regionCount); 1889 final boolean isBulkAssign = regionCount > 1; 1890 try { 1891 checkOpen(); 1892 } catch (IOException ie) { 1893 TableName tableName = null; 1894 if (regionCount == 1) { 1895 org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionInfo ri = 1896 request.getOpenInfo(0).getRegion(); 1897 if (ri != null) { 1898 tableName = ProtobufUtil.toTableName(ri.getTableName()); 1899 } 1900 } 1901 if (!TableName.META_TABLE_NAME.equals(tableName)) { 1902 throw new ServiceException(ie); 1903 } 1904 // We are assigning meta, wait a little for regionserver to finish initialization. 1905 // Default to quarter of RPC timeout 1906 int timeout = server.getConfiguration().getInt(HConstants.HBASE_RPC_TIMEOUT_KEY, 1907 HConstants.DEFAULT_HBASE_RPC_TIMEOUT) >> 2; 1908 long endTime = EnvironmentEdgeManager.currentTime() + timeout; 1909 synchronized (server.online) { 1910 try { 1911 while ( 1912 EnvironmentEdgeManager.currentTime() <= endTime && !server.isStopped() 1913 && !server.isOnline() 1914 ) { 1915 server.online.wait(server.getMsgInterval()); 1916 } 1917 checkOpen(); 1918 } catch (InterruptedException t) { 1919 Thread.currentThread().interrupt(); 1920 throw new ServiceException(t); 1921 } catch (IOException e) { 1922 throw new ServiceException(e); 1923 } 1924 } 1925 } 1926 1927 long masterSystemTime = request.hasMasterSystemTime() ? request.getMasterSystemTime() : -1; 1928 1929 for (RegionOpenInfo regionOpenInfo : request.getOpenInfoList()) { 1930 final RegionInfo region = ProtobufUtil.toRegionInfo(regionOpenInfo.getRegion()); 1931 TableDescriptor htd; 1932 try { 1933 String encodedName = region.getEncodedName(); 1934 byte[] encodedNameBytes = region.getEncodedNameAsBytes(); 1935 final HRegion onlineRegion = server.getRegion(encodedName); 1936 if (onlineRegion != null) { 1937 // The region is already online. This should not happen any more. 1938 String error = "Received OPEN for the region:" + region.getRegionNameAsString() 1939 + ", which is already online"; 1940 LOG.warn(error); 1941 // server.abort(error); 1942 // throw new IOException(error); 1943 builder.addOpeningState(RegionOpeningState.OPENED); 1944 continue; 1945 } 1946 LOG.info("Open " + region.getRegionNameAsString()); 1947 1948 final Boolean previous = 1949 server.getRegionsInTransitionInRS().putIfAbsent(encodedNameBytes, Boolean.TRUE); 1950 1951 if (Boolean.FALSE.equals(previous)) { 1952 if (server.getRegion(encodedName) != null) { 1953 // There is a close in progress. This should not happen any more. 1954 String error = "Received OPEN for the region:" + region.getRegionNameAsString() 1955 + ", which we are already trying to CLOSE"; 1956 server.abort(error); 1957 throw new IOException(error); 1958 } 1959 server.getRegionsInTransitionInRS().put(encodedNameBytes, Boolean.TRUE); 1960 } 1961 1962 if (Boolean.TRUE.equals(previous)) { 1963 // An open is in progress. This is supported, but let's log this. 1964 LOG.info("Receiving OPEN for the region:" + region.getRegionNameAsString() 1965 + ", which we are already trying to OPEN" 1966 + " - ignoring this new request for this region."); 1967 } 1968 1969 // We are opening this region. If it moves back and forth for whatever reason, we don't 1970 // want to keep returning the stale moved record while we are opening/if we close again. 1971 server.removeFromMovedRegions(region.getEncodedName()); 1972 1973 if (previous == null || !previous.booleanValue()) { 1974 htd = htds.get(region.getTable()); 1975 if (htd == null) { 1976 htd = server.getTableDescriptors().get(region.getTable()); 1977 htds.put(region.getTable(), htd); 1978 } 1979 if (htd == null) { 1980 throw new IOException("Missing table descriptor for " + region.getEncodedName()); 1981 } 1982 // If there is no action in progress, we can submit a specific handler. 1983 // Need to pass the expected version in the constructor. 1984 if (server.getExecutorService() == null) { 1985 LOG.info("No executor executorService; skipping open request"); 1986 } else { 1987 if (region.isMetaRegion()) { 1988 server.getExecutorService() 1989 .submit(new OpenMetaHandler(server, server, region, htd, masterSystemTime)); 1990 } else { 1991 if (regionOpenInfo.getFavoredNodesCount() > 0) { 1992 server.updateRegionFavoredNodesMapping(region.getEncodedName(), 1993 regionOpenInfo.getFavoredNodesList()); 1994 } 1995 if (htd.getPriority() >= HConstants.ADMIN_QOS || region.getTable().isSystemTable()) { 1996 server.getExecutorService().submit( 1997 new OpenPriorityRegionHandler(server, server, region, htd, masterSystemTime)); 1998 } else { 1999 server.getExecutorService() 2000 .submit(new OpenRegionHandler(server, server, region, htd, masterSystemTime)); 2001 } 2002 } 2003 } 2004 } 2005 2006 builder.addOpeningState(RegionOpeningState.OPENED); 2007 } catch (IOException ie) { 2008 LOG.warn("Failed opening region " + region.getRegionNameAsString(), ie); 2009 if (isBulkAssign) { 2010 builder.addOpeningState(RegionOpeningState.FAILED_OPENING); 2011 } else { 2012 throw new ServiceException(ie); 2013 } 2014 } 2015 } 2016 return builder.build(); 2017 } 2018 2019 /** 2020 * Warmup a region on this server. This method should only be called by Master. It synchronously 2021 * opens the region and closes the region bringing the most important pages in cache. 2022 */ 2023 @Override 2024 public WarmupRegionResponse warmupRegion(final RpcController controller, 2025 final WarmupRegionRequest request) throws ServiceException { 2026 final RegionInfo region = ProtobufUtil.toRegionInfo(request.getRegionInfo()); 2027 WarmupRegionResponse response = WarmupRegionResponse.getDefaultInstance(); 2028 try { 2029 checkOpen(); 2030 String encodedName = region.getEncodedName(); 2031 byte[] encodedNameBytes = region.getEncodedNameAsBytes(); 2032 final HRegion onlineRegion = server.getRegion(encodedName); 2033 if (onlineRegion != null) { 2034 LOG.info("{} is online; skipping warmup", region); 2035 return response; 2036 } 2037 TableDescriptor htd = server.getTableDescriptors().get(region.getTable()); 2038 if (server.getRegionsInTransitionInRS().containsKey(encodedNameBytes)) { 2039 LOG.info("{} is in transition; skipping warmup", region); 2040 return response; 2041 } 2042 LOG.info("Warmup {}", region.getRegionNameAsString()); 2043 HRegion.warmupHRegion(region, htd, server.getWAL(region), server.getConfiguration(), server, 2044 null); 2045 } catch (IOException ie) { 2046 LOG.error("Failed warmup of {}", region.getRegionNameAsString(), ie); 2047 throw new ServiceException(ie); 2048 } 2049 2050 return response; 2051 } 2052 2053 private CellScanner getAndReset(RpcController controller) { 2054 HBaseRpcController hrc = (HBaseRpcController) controller; 2055 CellScanner cells = hrc.cellScanner(); 2056 hrc.setCellScanner(null); 2057 return cells; 2058 } 2059 2060 /** 2061 * Replay the given changes when distributedLogReplay WAL edits from a failed RS. The guarantee is 2062 * that the given mutations will be durable on the receiving RS if this method returns without any 2063 * exception. 2064 * @param controller the RPC controller 2065 * @param request the request 2066 * @deprecated Since 3.0.0, will be removed in 4.0.0. Not used any more, put here only for 2067 * compatibility with old region replica implementation. Now we will use 2068 * {@code replicateToReplica} method instead. 2069 */ 2070 @Deprecated 2071 @Override 2072 @QosPriority(priority = HConstants.REPLAY_QOS) 2073 public ReplicateWALEntryResponse replay(final RpcController controller, 2074 final ReplicateWALEntryRequest request) throws ServiceException { 2075 long before = EnvironmentEdgeManager.currentTime(); 2076 CellScanner cells = getAndReset(controller); 2077 try { 2078 checkOpen(); 2079 List<WALEntry> entries = request.getEntryList(); 2080 if (entries == null || entries.isEmpty()) { 2081 // empty input 2082 return ReplicateWALEntryResponse.newBuilder().build(); 2083 } 2084 ByteString regionName = entries.get(0).getKey().getEncodedRegionName(); 2085 HRegion region = server.getRegionByEncodedName(regionName.toStringUtf8()); 2086 RegionCoprocessorHost coprocessorHost = 2087 ServerRegionReplicaUtil.isDefaultReplica(region.getRegionInfo()) 2088 ? region.getCoprocessorHost() 2089 : null; // do not invoke coprocessors if this is a secondary region replica 2090 List<Pair<WALKey, WALEdit>> walEntries = new ArrayList<>(); 2091 2092 // Skip adding the edits to WAL if this is a secondary region replica 2093 boolean isPrimary = RegionReplicaUtil.isDefaultReplica(region.getRegionInfo()); 2094 Durability durability = isPrimary ? Durability.USE_DEFAULT : Durability.SKIP_WAL; 2095 2096 for (WALEntry entry : entries) { 2097 if (!regionName.equals(entry.getKey().getEncodedRegionName())) { 2098 throw new NotServingRegionException("Replay request contains entries from multiple " 2099 + "regions. First region:" + regionName.toStringUtf8() + " , other region:" 2100 + entry.getKey().getEncodedRegionName()); 2101 } 2102 if (server.nonceManager != null && isPrimary) { 2103 long nonceGroup = 2104 entry.getKey().hasNonceGroup() ? entry.getKey().getNonceGroup() : HConstants.NO_NONCE; 2105 long nonce = entry.getKey().hasNonce() ? entry.getKey().getNonce() : HConstants.NO_NONCE; 2106 server.nonceManager.reportOperationFromWal(nonceGroup, nonce, 2107 entry.getKey().getWriteTime()); 2108 } 2109 Pair<WALKey, WALEdit> walEntry = (coprocessorHost == null) ? null : new Pair<>(); 2110 List<MutationReplay> edits = 2111 WALSplitUtil.getMutationsFromWALEntry(entry, cells, walEntry, durability); 2112 if (coprocessorHost != null) { 2113 // Start coprocessor replay here. The coprocessor is for each WALEdit instead of a 2114 // KeyValue. 2115 if ( 2116 coprocessorHost.preWALRestore(region.getRegionInfo(), walEntry.getFirst(), 2117 walEntry.getSecond()) 2118 ) { 2119 // if bypass this log entry, ignore it ... 2120 continue; 2121 } 2122 walEntries.add(walEntry); 2123 } 2124 if (edits != null && !edits.isEmpty()) { 2125 // HBASE-17924 2126 // sort to improve lock efficiency 2127 Collections.sort(edits, (v1, v2) -> Row.COMPARATOR.compare(v1.mutation, v2.mutation)); 2128 long replaySeqId = (entry.getKey().hasOrigSequenceNumber()) 2129 ? entry.getKey().getOrigSequenceNumber() 2130 : entry.getKey().getLogSequenceNumber(); 2131 OperationStatus[] result = doReplayBatchOp(region, edits, replaySeqId); 2132 // check if it's a partial success 2133 for (int i = 0; result != null && i < result.length; i++) { 2134 if (result[i] != OperationStatus.SUCCESS) { 2135 throw new IOException(result[i].getExceptionMsg()); 2136 } 2137 } 2138 } 2139 } 2140 2141 // sync wal at the end because ASYNC_WAL is used above 2142 WAL wal = region.getWAL(); 2143 if (wal != null) { 2144 wal.sync(); 2145 } 2146 2147 if (coprocessorHost != null) { 2148 for (Pair<WALKey, WALEdit> entry : walEntries) { 2149 coprocessorHost.postWALRestore(region.getRegionInfo(), entry.getFirst(), 2150 entry.getSecond()); 2151 } 2152 } 2153 return ReplicateWALEntryResponse.newBuilder().build(); 2154 } catch (IOException ie) { 2155 throw new ServiceException(ie); 2156 } finally { 2157 final MetricsRegionServer metricsRegionServer = server.getMetrics(); 2158 if (metricsRegionServer != null) { 2159 metricsRegionServer.updateReplay(EnvironmentEdgeManager.currentTime() - before); 2160 } 2161 } 2162 } 2163 2164 /** 2165 * Replay the given changes on a secondary replica 2166 */ 2167 @Override 2168 public ReplicateWALEntryResponse replicateToReplica(RpcController controller, 2169 ReplicateWALEntryRequest request) throws ServiceException { 2170 CellScanner cells = getAndReset(controller); 2171 try { 2172 checkOpen(); 2173 List<WALEntry> entries = request.getEntryList(); 2174 if (entries == null || entries.isEmpty()) { 2175 // empty input 2176 return ReplicateWALEntryResponse.newBuilder().build(); 2177 } 2178 ByteString regionName = entries.get(0).getKey().getEncodedRegionName(); 2179 HRegion region = server.getRegionByEncodedName(regionName.toStringUtf8()); 2180 if (RegionReplicaUtil.isDefaultReplica(region.getRegionInfo())) { 2181 throw new DoNotRetryIOException( 2182 "Should not replicate to primary replica " + region.getRegionInfo() + ", CODE BUG?"); 2183 } 2184 for (WALEntry entry : entries) { 2185 if (!regionName.equals(entry.getKey().getEncodedRegionName())) { 2186 throw new NotServingRegionException( 2187 "ReplicateToReplica request contains entries from multiple " + "regions. First region:" 2188 + regionName.toStringUtf8() + " , other region:" 2189 + entry.getKey().getEncodedRegionName()); 2190 } 2191 region.replayWALEntry(entry, cells); 2192 } 2193 return ReplicateWALEntryResponse.newBuilder().build(); 2194 } catch (IOException ie) { 2195 throw new ServiceException(ie); 2196 } 2197 } 2198 2199 private void checkShouldRejectReplicationRequest(List<WALEntry> entries) throws IOException { 2200 ReplicationSourceService replicationSource = server.getReplicationSourceService(); 2201 if (replicationSource == null || entries.isEmpty()) { 2202 return; 2203 } 2204 // We can ensure that all entries are for one peer, so only need to check one entry's 2205 // table name. if the table hit sync replication at peer side and the peer cluster 2206 // is (or is transiting to) state ACTIVE or DOWNGRADE_ACTIVE, we should reject to apply 2207 // those entries according to the design doc. 2208 TableName table = TableName.valueOf(entries.get(0).getKey().getTableName().toByteArray()); 2209 if ( 2210 replicationSource.getSyncReplicationPeerInfoProvider().checkState(table, 2211 RejectReplicationRequestStateChecker.get()) 2212 ) { 2213 throw new DoNotRetryIOException( 2214 "Reject to apply to sink cluster because sync replication state of sink cluster " 2215 + "is ACTIVE or DOWNGRADE_ACTIVE, table: " + table); 2216 } 2217 } 2218 2219 /** 2220 * Replicate WAL entries on the region server. 2221 * @param controller the RPC controller 2222 * @param request the request 2223 */ 2224 @Override 2225 @QosPriority(priority = HConstants.REPLICATION_QOS) 2226 public ReplicateWALEntryResponse replicateWALEntry(final RpcController controller, 2227 final ReplicateWALEntryRequest request) throws ServiceException { 2228 try { 2229 checkOpen(); 2230 if (server.getReplicationSinkService() != null) { 2231 requestCount.increment(); 2232 List<WALEntry> entries = request.getEntryList(); 2233 checkShouldRejectReplicationRequest(entries); 2234 CellScanner cellScanner = getAndReset(controller); 2235 server.getRegionServerCoprocessorHost().preReplicateLogEntries(); 2236 server.getReplicationSinkService().replicateLogEntries(entries, cellScanner, 2237 request.getReplicationClusterId(), request.getSourceBaseNamespaceDirPath(), 2238 request.getSourceHFileArchiveDirPath()); 2239 server.getRegionServerCoprocessorHost().postReplicateLogEntries(); 2240 return ReplicateWALEntryResponse.newBuilder().build(); 2241 } else { 2242 throw new ServiceException("Replication services are not initialized yet"); 2243 } 2244 } catch (IOException ie) { 2245 throw new ServiceException(ie); 2246 } 2247 } 2248 2249 /** 2250 * Roll the WAL writer of the region server. 2251 * @param controller the RPC controller 2252 * @param request the request n 2253 */ 2254 @Override 2255 public RollWALWriterResponse rollWALWriter(final RpcController controller, 2256 final RollWALWriterRequest request) throws ServiceException { 2257 try { 2258 checkOpen(); 2259 requestCount.increment(); 2260 server.getRegionServerCoprocessorHost().preRollWALWriterRequest(); 2261 server.getWalRoller().requestRollAll(); 2262 server.getRegionServerCoprocessorHost().postRollWALWriterRequest(); 2263 RollWALWriterResponse.Builder builder = RollWALWriterResponse.newBuilder(); 2264 return builder.build(); 2265 } catch (IOException ie) { 2266 throw new ServiceException(ie); 2267 } 2268 } 2269 2270 /** 2271 * Stop the region server. 2272 * @param controller the RPC controller 2273 * @param request the request n 2274 */ 2275 @Override 2276 @QosPriority(priority = HConstants.ADMIN_QOS) 2277 public StopServerResponse stopServer(final RpcController controller, 2278 final StopServerRequest request) throws ServiceException { 2279 rpcPreCheck("stopServer"); 2280 requestCount.increment(); 2281 String reason = request.getReason(); 2282 server.stop(reason); 2283 return StopServerResponse.newBuilder().build(); 2284 } 2285 2286 @Override 2287 public UpdateFavoredNodesResponse updateFavoredNodes(RpcController controller, 2288 UpdateFavoredNodesRequest request) throws ServiceException { 2289 rpcPreCheck("updateFavoredNodes"); 2290 List<UpdateFavoredNodesRequest.RegionUpdateInfo> openInfoList = request.getUpdateInfoList(); 2291 UpdateFavoredNodesResponse.Builder respBuilder = UpdateFavoredNodesResponse.newBuilder(); 2292 for (UpdateFavoredNodesRequest.RegionUpdateInfo regionUpdateInfo : openInfoList) { 2293 RegionInfo hri = ProtobufUtil.toRegionInfo(regionUpdateInfo.getRegion()); 2294 if (regionUpdateInfo.getFavoredNodesCount() > 0) { 2295 server.updateRegionFavoredNodesMapping(hri.getEncodedName(), 2296 regionUpdateInfo.getFavoredNodesList()); 2297 } 2298 } 2299 respBuilder.setResponse(openInfoList.size()); 2300 return respBuilder.build(); 2301 } 2302 2303 /** 2304 * Atomically bulk load several HFiles into an open region 2305 * @return true if successful, false is failed but recoverably (no action) 2306 * @throws ServiceException if failed unrecoverably 2307 */ 2308 @Override 2309 public BulkLoadHFileResponse bulkLoadHFile(final RpcController controller, 2310 final BulkLoadHFileRequest request) throws ServiceException { 2311 long start = EnvironmentEdgeManager.currentTime(); 2312 List<String> clusterIds = new ArrayList<>(request.getClusterIdsList()); 2313 if (clusterIds.contains(this.server.getClusterId())) { 2314 return BulkLoadHFileResponse.newBuilder().setLoaded(true).build(); 2315 } else { 2316 clusterIds.add(this.server.getClusterId()); 2317 } 2318 try { 2319 checkOpen(); 2320 requestCount.increment(); 2321 HRegion region = getRegion(request.getRegion()); 2322 final boolean spaceQuotaEnabled = QuotaUtil.isQuotaEnabled(getConfiguration()); 2323 long sizeToBeLoaded = -1; 2324 2325 // Check to see if this bulk load would exceed the space quota for this table 2326 if (spaceQuotaEnabled) { 2327 ActivePolicyEnforcement activeSpaceQuotas = getSpaceQuotaManager().getActiveEnforcements(); 2328 SpaceViolationPolicyEnforcement enforcement = 2329 activeSpaceQuotas.getPolicyEnforcement(region); 2330 if (enforcement != null) { 2331 // Bulk loads must still be atomic. We must enact all or none. 2332 List<String> filePaths = new ArrayList<>(request.getFamilyPathCount()); 2333 for (FamilyPath familyPath : request.getFamilyPathList()) { 2334 filePaths.add(familyPath.getPath()); 2335 } 2336 // Check if the batch of files exceeds the current quota 2337 sizeToBeLoaded = enforcement.computeBulkLoadSize(getFileSystem(filePaths), filePaths); 2338 } 2339 } 2340 // secure bulk load 2341 Map<byte[], List<Path>> map = 2342 server.getSecureBulkLoadManager().secureBulkLoadHFiles(region, request, clusterIds); 2343 BulkLoadHFileResponse.Builder builder = BulkLoadHFileResponse.newBuilder(); 2344 builder.setLoaded(map != null); 2345 if (map != null) { 2346 // Treat any negative size as a flag to "ignore" updating the region size as that is 2347 // not possible to occur in real life (cannot bulk load a file with negative size) 2348 if (spaceQuotaEnabled && sizeToBeLoaded > 0) { 2349 if (LOG.isTraceEnabled()) { 2350 LOG.trace("Incrementing space use of " + region.getRegionInfo() + " by " 2351 + sizeToBeLoaded + " bytes"); 2352 } 2353 // Inform space quotas of the new files for this region 2354 getSpaceQuotaManager().getRegionSizeStore().incrementRegionSize(region.getRegionInfo(), 2355 sizeToBeLoaded); 2356 } 2357 } 2358 return builder.build(); 2359 } catch (IOException ie) { 2360 throw new ServiceException(ie); 2361 } finally { 2362 final MetricsRegionServer metricsRegionServer = server.getMetrics(); 2363 if (metricsRegionServer != null) { 2364 metricsRegionServer.updateBulkLoad(EnvironmentEdgeManager.currentTime() - start); 2365 } 2366 } 2367 } 2368 2369 @Override 2370 public PrepareBulkLoadResponse prepareBulkLoad(RpcController controller, 2371 PrepareBulkLoadRequest request) throws ServiceException { 2372 try { 2373 checkOpen(); 2374 requestCount.increment(); 2375 2376 HRegion region = getRegion(request.getRegion()); 2377 2378 String bulkToken = server.getSecureBulkLoadManager().prepareBulkLoad(region, request); 2379 PrepareBulkLoadResponse.Builder builder = PrepareBulkLoadResponse.newBuilder(); 2380 builder.setBulkToken(bulkToken); 2381 return builder.build(); 2382 } catch (IOException ie) { 2383 throw new ServiceException(ie); 2384 } 2385 } 2386 2387 @Override 2388 public CleanupBulkLoadResponse cleanupBulkLoad(RpcController controller, 2389 CleanupBulkLoadRequest request) throws ServiceException { 2390 try { 2391 checkOpen(); 2392 requestCount.increment(); 2393 2394 HRegion region = getRegion(request.getRegion()); 2395 2396 server.getSecureBulkLoadManager().cleanupBulkLoad(region, request); 2397 return CleanupBulkLoadResponse.newBuilder().build(); 2398 } catch (IOException ie) { 2399 throw new ServiceException(ie); 2400 } 2401 } 2402 2403 @Override 2404 public CoprocessorServiceResponse execService(final RpcController controller, 2405 final CoprocessorServiceRequest request) throws ServiceException { 2406 try { 2407 checkOpen(); 2408 requestCount.increment(); 2409 HRegion region = getRegion(request.getRegion()); 2410 Message result = execServiceOnRegion(region, request.getCall()); 2411 CoprocessorServiceResponse.Builder builder = CoprocessorServiceResponse.newBuilder(); 2412 builder.setRegion(RequestConverter.buildRegionSpecifier(RegionSpecifierType.REGION_NAME, 2413 region.getRegionInfo().getRegionName())); 2414 // TODO: COPIES!!!!!! 2415 builder.setValue(builder.getValueBuilder().setName(result.getClass().getName()).setValue( 2416 org.apache.hbase.thirdparty.com.google.protobuf.ByteString.copyFrom(result.toByteArray()))); 2417 return builder.build(); 2418 } catch (IOException ie) { 2419 throw new ServiceException(ie); 2420 } 2421 } 2422 2423 private FileSystem getFileSystem(List<String> filePaths) throws IOException { 2424 if (filePaths.isEmpty()) { 2425 // local hdfs 2426 return server.getFileSystem(); 2427 } 2428 // source hdfs 2429 return new Path(filePaths.get(0)).getFileSystem(server.getConfiguration()); 2430 } 2431 2432 private Message execServiceOnRegion(HRegion region, 2433 final ClientProtos.CoprocessorServiceCall serviceCall) throws IOException { 2434 // ignore the passed in controller (from the serialized call) 2435 ServerRpcController execController = new ServerRpcController(); 2436 return region.execService(execController, serviceCall); 2437 } 2438 2439 private boolean shouldRejectRequestsFromClient(HRegion region) { 2440 TableName table = region.getRegionInfo().getTable(); 2441 ReplicationSourceService service = server.getReplicationSourceService(); 2442 return service != null && service.getSyncReplicationPeerInfoProvider().checkState(table, 2443 RejectRequestsFromClientStateChecker.get()); 2444 } 2445 2446 private void rejectIfInStandByState(HRegion region) throws DoNotRetryIOException { 2447 if (shouldRejectRequestsFromClient(region)) { 2448 throw new DoNotRetryIOException( 2449 region.getRegionInfo().getRegionNameAsString() + " is in STANDBY state."); 2450 } 2451 } 2452 2453 /** 2454 * Get data from a table. 2455 * @param controller the RPC controller 2456 * @param request the get request n 2457 */ 2458 @Override 2459 public GetResponse get(final RpcController controller, final GetRequest request) 2460 throws ServiceException { 2461 long before = EnvironmentEdgeManager.currentTime(); 2462 OperationQuota quota = null; 2463 HRegion region = null; 2464 try { 2465 checkOpen(); 2466 requestCount.increment(); 2467 rpcGetRequestCount.increment(); 2468 region = getRegion(request.getRegion()); 2469 rejectIfInStandByState(region); 2470 2471 GetResponse.Builder builder = GetResponse.newBuilder(); 2472 ClientProtos.Get get = request.getGet(); 2473 // An asynchbase client, https://github.com/OpenTSDB/asynchbase, starts by trying to do 2474 // a get closest before. Throwing the UnknownProtocolException signals it that it needs 2475 // to switch and do hbase2 protocol (HBase servers do not tell clients what versions 2476 // they are; its a problem for non-native clients like asynchbase. HBASE-20225. 2477 if (get.hasClosestRowBefore() && get.getClosestRowBefore()) { 2478 throw new UnknownProtocolException("Is this a pre-hbase-1.0.0 or asynchbase client? " 2479 + "Client is invoking getClosestRowBefore removed in hbase-2.0.0 replaced by " 2480 + "reverse Scan."); 2481 } 2482 Boolean existence = null; 2483 Result r = null; 2484 RpcCallContext context = RpcServer.getCurrentCall().orElse(null); 2485 quota = getRpcQuotaManager().checkQuota(region, OperationQuota.OperationType.GET); 2486 2487 Get clientGet = ProtobufUtil.toGet(get); 2488 if (get.getExistenceOnly() && region.getCoprocessorHost() != null) { 2489 existence = region.getCoprocessorHost().preExists(clientGet); 2490 } 2491 if (existence == null) { 2492 if (context != null) { 2493 r = get(clientGet, (region), null, context); 2494 } else { 2495 // for test purpose 2496 r = region.get(clientGet); 2497 } 2498 if (get.getExistenceOnly()) { 2499 boolean exists = r.getExists(); 2500 if (region.getCoprocessorHost() != null) { 2501 exists = region.getCoprocessorHost().postExists(clientGet, exists); 2502 } 2503 existence = exists; 2504 } 2505 } 2506 if (existence != null) { 2507 ClientProtos.Result pbr = 2508 ProtobufUtil.toResult(existence, region.getRegionInfo().getReplicaId() != 0); 2509 builder.setResult(pbr); 2510 } else if (r != null) { 2511 ClientProtos.Result pbr; 2512 if ( 2513 isClientCellBlockSupport(context) && controller instanceof HBaseRpcController 2514 && VersionInfoUtil.hasMinimumVersion(context.getClientVersionInfo(), 1, 3) 2515 ) { 2516 pbr = ProtobufUtil.toResultNoData(r); 2517 ((HBaseRpcController) controller) 2518 .setCellScanner(CellUtil.createCellScanner(r.rawCells())); 2519 addSize(context, r, null); 2520 } else { 2521 pbr = ProtobufUtil.toResult(r); 2522 } 2523 builder.setResult(pbr); 2524 } 2525 // r.cells is null when an table.exists(get) call 2526 if (r != null && r.rawCells() != null) { 2527 quota.addGetResult(r); 2528 } 2529 return builder.build(); 2530 } catch (IOException ie) { 2531 throw new ServiceException(ie); 2532 } finally { 2533 final MetricsRegionServer metricsRegionServer = server.getMetrics(); 2534 if (metricsRegionServer != null) { 2535 TableDescriptor td = region != null ? region.getTableDescriptor() : null; 2536 if (td != null) { 2537 metricsRegionServer.updateGet(td.getTableName(), 2538 EnvironmentEdgeManager.currentTime() - before); 2539 } 2540 } 2541 if (quota != null) { 2542 quota.close(); 2543 } 2544 } 2545 } 2546 2547 private Result get(Get get, HRegion region, RegionScannersCloseCallBack closeCallBack, 2548 RpcCallContext context) throws IOException { 2549 region.prepareGet(get); 2550 boolean stale = region.getRegionInfo().getReplicaId() != 0; 2551 2552 // This method is almost the same as HRegion#get. 2553 List<Cell> results = new ArrayList<>(); 2554 long before = EnvironmentEdgeManager.currentTime(); 2555 // pre-get CP hook 2556 if (region.getCoprocessorHost() != null) { 2557 if (region.getCoprocessorHost().preGet(get, results)) { 2558 region.metricsUpdateForGet(results, before); 2559 return Result.create(results, get.isCheckExistenceOnly() ? !results.isEmpty() : null, 2560 stale); 2561 } 2562 } 2563 Scan scan = new Scan(get); 2564 if (scan.getLoadColumnFamiliesOnDemandValue() == null) { 2565 scan.setLoadColumnFamiliesOnDemand(region.isLoadingCfsOnDemandDefault()); 2566 } 2567 RegionScannerImpl scanner = null; 2568 try { 2569 scanner = region.getScanner(scan); 2570 scanner.next(results); 2571 } finally { 2572 if (scanner != null) { 2573 if (closeCallBack == null) { 2574 // If there is a context then the scanner can be added to the current 2575 // RpcCallContext. The rpc callback will take care of closing the 2576 // scanner, for eg in case 2577 // of get() 2578 context.setCallBack(scanner); 2579 } else { 2580 // The call is from multi() where the results from the get() are 2581 // aggregated and then send out to the 2582 // rpc. The rpccall back will close all such scanners created as part 2583 // of multi(). 2584 closeCallBack.addScanner(scanner); 2585 } 2586 } 2587 } 2588 2589 // post-get CP hook 2590 if (region.getCoprocessorHost() != null) { 2591 region.getCoprocessorHost().postGet(get, results); 2592 } 2593 region.metricsUpdateForGet(results, before); 2594 2595 return Result.create(results, get.isCheckExistenceOnly() ? !results.isEmpty() : null, stale); 2596 } 2597 2598 private void checkBatchSizeAndLogLargeSize(MultiRequest request) throws ServiceException { 2599 int sum = 0; 2600 String firstRegionName = null; 2601 for (RegionAction regionAction : request.getRegionActionList()) { 2602 if (sum == 0) { 2603 firstRegionName = Bytes.toStringBinary(regionAction.getRegion().getValue().toByteArray()); 2604 } 2605 sum += regionAction.getActionCount(); 2606 } 2607 if (sum > rowSizeWarnThreshold) { 2608 LOG.warn("Large batch operation detected (greater than " + rowSizeWarnThreshold 2609 + ") (HBASE-18023)." + " Requested Number of Rows: " + sum + " Client: " 2610 + RpcServer.getRequestUserName().orElse(null) + "/" 2611 + RpcServer.getRemoteAddress().orElse(null) + " first region in multi=" + firstRegionName); 2612 if (rejectRowsWithSizeOverThreshold) { 2613 throw new ServiceException( 2614 "Rejecting large batch operation for current batch with firstRegionName: " 2615 + firstRegionName + " , Requested Number of Rows: " + sum + " , Size Threshold: " 2616 + rowSizeWarnThreshold); 2617 } 2618 } 2619 } 2620 2621 private void failRegionAction(MultiResponse.Builder responseBuilder, 2622 RegionActionResult.Builder regionActionResultBuilder, RegionAction regionAction, 2623 CellScanner cellScanner, Throwable error) { 2624 rpcServer.getMetrics().exception(error); 2625 regionActionResultBuilder.setException(ResponseConverter.buildException(error)); 2626 responseBuilder.addRegionActionResult(regionActionResultBuilder.build()); 2627 // All Mutations in this RegionAction not executed as we can not see the Region online here 2628 // in this RS. Will be retried from Client. Skipping all the Cells in CellScanner 2629 // corresponding to these Mutations. 2630 if (cellScanner != null) { 2631 skipCellsForMutations(regionAction.getActionList(), cellScanner); 2632 } 2633 } 2634 2635 private boolean isReplicationRequest(Action action) { 2636 // replication request can only be put or delete. 2637 if (!action.hasMutation()) { 2638 return false; 2639 } 2640 MutationProto mutation = action.getMutation(); 2641 MutationType type = mutation.getMutateType(); 2642 if (type != MutationType.PUT && type != MutationType.DELETE) { 2643 return false; 2644 } 2645 // replication will set a special attribute so we can make use of it to decide whether a request 2646 // is for replication. 2647 return mutation.getAttributeList().stream().map(p -> p.getName()) 2648 .filter(n -> n.equals(ReplicationUtils.REPLICATION_ATTR_NAME)).findAny().isPresent(); 2649 } 2650 2651 /** 2652 * Execute multiple actions on a table: get, mutate, and/or execCoprocessor 2653 * @param rpcc the RPC controller 2654 * @param request the multi request n 2655 */ 2656 @Override 2657 public MultiResponse multi(final RpcController rpcc, final MultiRequest request) 2658 throws ServiceException { 2659 try { 2660 checkOpen(); 2661 } catch (IOException ie) { 2662 throw new ServiceException(ie); 2663 } 2664 2665 checkBatchSizeAndLogLargeSize(request); 2666 2667 // rpc controller is how we bring in data via the back door; it is unprotobuf'ed data. 2668 // It is also the conduit via which we pass back data. 2669 HBaseRpcController controller = (HBaseRpcController) rpcc; 2670 CellScanner cellScanner = controller != null ? controller.cellScanner() : null; 2671 if (controller != null) { 2672 controller.setCellScanner(null); 2673 } 2674 2675 long nonceGroup = request.hasNonceGroup() ? request.getNonceGroup() : HConstants.NO_NONCE; 2676 2677 MultiResponse.Builder responseBuilder = MultiResponse.newBuilder(); 2678 RegionActionResult.Builder regionActionResultBuilder = RegionActionResult.newBuilder(); 2679 this.rpcMultiRequestCount.increment(); 2680 this.requestCount.increment(); 2681 ActivePolicyEnforcement spaceQuotaEnforcement = getSpaceQuotaManager().getActiveEnforcements(); 2682 2683 // We no longer use MultiRequest#condition. Instead, we use RegionAction#condition. The 2684 // following logic is for backward compatibility as old clients still use 2685 // MultiRequest#condition in case of checkAndMutate with RowMutations. 2686 if (request.hasCondition()) { 2687 if (request.getRegionActionList().isEmpty()) { 2688 // If the region action list is empty, do nothing. 2689 responseBuilder.setProcessed(true); 2690 return responseBuilder.build(); 2691 } 2692 2693 RegionAction regionAction = request.getRegionAction(0); 2694 2695 // When request.hasCondition() is true, regionAction.getAtomic() should be always true. So 2696 // we can assume regionAction.getAtomic() is true here. 2697 assert regionAction.getAtomic(); 2698 2699 OperationQuota quota; 2700 HRegion region; 2701 RegionSpecifier regionSpecifier = regionAction.getRegion(); 2702 2703 try { 2704 region = getRegion(regionSpecifier); 2705 quota = getRpcQuotaManager().checkQuota(region, regionAction.getActionList()); 2706 } catch (IOException e) { 2707 failRegionAction(responseBuilder, regionActionResultBuilder, regionAction, cellScanner, e); 2708 return responseBuilder.build(); 2709 } 2710 2711 try { 2712 boolean rejectIfFromClient = shouldRejectRequestsFromClient(region); 2713 // We only allow replication in standby state and it will not set the atomic flag. 2714 if (rejectIfFromClient) { 2715 failRegionAction(responseBuilder, regionActionResultBuilder, regionAction, cellScanner, 2716 new DoNotRetryIOException( 2717 region.getRegionInfo().getRegionNameAsString() + " is in STANDBY state")); 2718 return responseBuilder.build(); 2719 } 2720 2721 try { 2722 CheckAndMutateResult result = checkAndMutate(region, regionAction.getActionList(), 2723 cellScanner, request.getCondition(), nonceGroup, spaceQuotaEnforcement); 2724 responseBuilder.setProcessed(result.isSuccess()); 2725 ClientProtos.ResultOrException.Builder resultOrExceptionOrBuilder = 2726 ClientProtos.ResultOrException.newBuilder(); 2727 for (int i = 0; i < regionAction.getActionCount(); i++) { 2728 // To unify the response format with doNonAtomicRegionMutation and read through 2729 // client's AsyncProcess we have to add an empty result instance per operation 2730 resultOrExceptionOrBuilder.clear(); 2731 resultOrExceptionOrBuilder.setIndex(i); 2732 regionActionResultBuilder.addResultOrException(resultOrExceptionOrBuilder.build()); 2733 } 2734 } catch (IOException e) { 2735 rpcServer.getMetrics().exception(e); 2736 // As it's an atomic operation with a condition, we may expect it's a global failure. 2737 regionActionResultBuilder.setException(ResponseConverter.buildException(e)); 2738 } 2739 } finally { 2740 quota.close(); 2741 } 2742 2743 responseBuilder.addRegionActionResult(regionActionResultBuilder.build()); 2744 ClientProtos.RegionLoadStats regionLoadStats = region.getLoadStatistics(); 2745 if (regionLoadStats != null) { 2746 responseBuilder.setRegionStatistics(MultiRegionLoadStats.newBuilder() 2747 .addRegion(regionSpecifier).addStat(regionLoadStats).build()); 2748 } 2749 return responseBuilder.build(); 2750 } 2751 2752 // this will contain all the cells that we need to return. It's created later, if needed. 2753 List<CellScannable> cellsToReturn = null; 2754 RegionScannersCloseCallBack closeCallBack = null; 2755 RpcCallContext context = RpcServer.getCurrentCall().orElse(null); 2756 Map<RegionSpecifier, ClientProtos.RegionLoadStats> regionStats = 2757 new HashMap<>(request.getRegionActionCount()); 2758 2759 for (RegionAction regionAction : request.getRegionActionList()) { 2760 OperationQuota quota; 2761 HRegion region; 2762 RegionSpecifier regionSpecifier = regionAction.getRegion(); 2763 regionActionResultBuilder.clear(); 2764 2765 try { 2766 region = getRegion(regionSpecifier); 2767 quota = getRpcQuotaManager().checkQuota(region, regionAction.getActionList()); 2768 } catch (IOException e) { 2769 failRegionAction(responseBuilder, regionActionResultBuilder, regionAction, cellScanner, e); 2770 continue; // For this region it's a failure. 2771 } 2772 2773 try { 2774 boolean rejectIfFromClient = shouldRejectRequestsFromClient(region); 2775 2776 if (regionAction.hasCondition()) { 2777 // We only allow replication in standby state and it will not set the atomic flag. 2778 if (rejectIfFromClient) { 2779 failRegionAction(responseBuilder, regionActionResultBuilder, regionAction, cellScanner, 2780 new DoNotRetryIOException( 2781 region.getRegionInfo().getRegionNameAsString() + " is in STANDBY state")); 2782 continue; 2783 } 2784 2785 try { 2786 ClientProtos.ResultOrException.Builder resultOrExceptionOrBuilder = 2787 ClientProtos.ResultOrException.newBuilder(); 2788 if (regionAction.getActionCount() == 1) { 2789 CheckAndMutateResult result = 2790 checkAndMutate(region, quota, regionAction.getAction(0).getMutation(), cellScanner, 2791 regionAction.getCondition(), nonceGroup, spaceQuotaEnforcement); 2792 regionActionResultBuilder.setProcessed(result.isSuccess()); 2793 resultOrExceptionOrBuilder.setIndex(0); 2794 if (result.getResult() != null) { 2795 resultOrExceptionOrBuilder.setResult(ProtobufUtil.toResult(result.getResult())); 2796 } 2797 regionActionResultBuilder.addResultOrException(resultOrExceptionOrBuilder.build()); 2798 } else { 2799 CheckAndMutateResult result = checkAndMutate(region, regionAction.getActionList(), 2800 cellScanner, regionAction.getCondition(), nonceGroup, spaceQuotaEnforcement); 2801 regionActionResultBuilder.setProcessed(result.isSuccess()); 2802 for (int i = 0; i < regionAction.getActionCount(); i++) { 2803 if (i == 0 && result.getResult() != null) { 2804 // Set the result of the Increment/Append operations to the first element of the 2805 // ResultOrException list 2806 resultOrExceptionOrBuilder.setIndex(i); 2807 regionActionResultBuilder.addResultOrException(resultOrExceptionOrBuilder 2808 .setResult(ProtobufUtil.toResult(result.getResult())).build()); 2809 continue; 2810 } 2811 // To unify the response format with doNonAtomicRegionMutation and read through 2812 // client's AsyncProcess we have to add an empty result instance per operation 2813 resultOrExceptionOrBuilder.clear(); 2814 resultOrExceptionOrBuilder.setIndex(i); 2815 regionActionResultBuilder.addResultOrException(resultOrExceptionOrBuilder.build()); 2816 } 2817 } 2818 } catch (IOException e) { 2819 rpcServer.getMetrics().exception(e); 2820 // As it's an atomic operation with a condition, we may expect it's a global failure. 2821 regionActionResultBuilder.setException(ResponseConverter.buildException(e)); 2822 } 2823 } else if (regionAction.hasAtomic() && regionAction.getAtomic()) { 2824 // We only allow replication in standby state and it will not set the atomic flag. 2825 if (rejectIfFromClient) { 2826 failRegionAction(responseBuilder, regionActionResultBuilder, regionAction, cellScanner, 2827 new DoNotRetryIOException( 2828 region.getRegionInfo().getRegionNameAsString() + " is in STANDBY state")); 2829 continue; 2830 } 2831 try { 2832 doAtomicBatchOp(regionActionResultBuilder, region, quota, regionAction.getActionList(), 2833 cellScanner, nonceGroup, spaceQuotaEnforcement); 2834 regionActionResultBuilder.setProcessed(true); 2835 // We no longer use MultiResponse#processed. Instead, we use 2836 // RegionActionResult#processed. This is for backward compatibility for old clients. 2837 responseBuilder.setProcessed(true); 2838 } catch (IOException e) { 2839 rpcServer.getMetrics().exception(e); 2840 // As it's atomic, we may expect it's a global failure. 2841 regionActionResultBuilder.setException(ResponseConverter.buildException(e)); 2842 } 2843 } else { 2844 if ( 2845 rejectIfFromClient && regionAction.getActionCount() > 0 2846 && !isReplicationRequest(regionAction.getAction(0)) 2847 ) { 2848 // fail if it is not a replication request 2849 failRegionAction(responseBuilder, regionActionResultBuilder, regionAction, cellScanner, 2850 new DoNotRetryIOException( 2851 region.getRegionInfo().getRegionNameAsString() + " is in STANDBY state")); 2852 continue; 2853 } 2854 // doNonAtomicRegionMutation manages the exception internally 2855 if (context != null && closeCallBack == null) { 2856 // An RpcCallBack that creates a list of scanners that needs to perform callBack 2857 // operation on completion of multiGets. 2858 // Set this only once 2859 closeCallBack = new RegionScannersCloseCallBack(); 2860 context.setCallBack(closeCallBack); 2861 } 2862 cellsToReturn = doNonAtomicRegionMutation(region, quota, regionAction, cellScanner, 2863 regionActionResultBuilder, cellsToReturn, nonceGroup, closeCallBack, context, 2864 spaceQuotaEnforcement); 2865 } 2866 } finally { 2867 quota.close(); 2868 } 2869 2870 responseBuilder.addRegionActionResult(regionActionResultBuilder.build()); 2871 ClientProtos.RegionLoadStats regionLoadStats = region.getLoadStatistics(); 2872 if (regionLoadStats != null) { 2873 regionStats.put(regionSpecifier, regionLoadStats); 2874 } 2875 } 2876 // Load the controller with the Cells to return. 2877 if (cellsToReturn != null && !cellsToReturn.isEmpty() && controller != null) { 2878 controller.setCellScanner(CellUtil.createCellScanner(cellsToReturn)); 2879 } 2880 2881 MultiRegionLoadStats.Builder builder = MultiRegionLoadStats.newBuilder(); 2882 for (Entry<RegionSpecifier, ClientProtos.RegionLoadStats> stat : regionStats.entrySet()) { 2883 builder.addRegion(stat.getKey()); 2884 builder.addStat(stat.getValue()); 2885 } 2886 responseBuilder.setRegionStatistics(builder); 2887 return responseBuilder.build(); 2888 } 2889 2890 private void skipCellsForMutations(List<Action> actions, CellScanner cellScanner) { 2891 if (cellScanner == null) { 2892 return; 2893 } 2894 for (Action action : actions) { 2895 skipCellsForMutation(action, cellScanner); 2896 } 2897 } 2898 2899 private void skipCellsForMutation(Action action, CellScanner cellScanner) { 2900 if (cellScanner == null) { 2901 return; 2902 } 2903 try { 2904 if (action.hasMutation()) { 2905 MutationProto m = action.getMutation(); 2906 if (m.hasAssociatedCellCount()) { 2907 for (int i = 0; i < m.getAssociatedCellCount(); i++) { 2908 cellScanner.advance(); 2909 } 2910 } 2911 } 2912 } catch (IOException e) { 2913 // No need to handle these Individual Muatation level issue. Any way this entire RegionAction 2914 // marked as failed as we could not see the Region here. At client side the top level 2915 // RegionAction exception will be considered first. 2916 LOG.error("Error while skipping Cells in CellScanner for invalid Region Mutations", e); 2917 } 2918 } 2919 2920 /** 2921 * Mutate data in a table. 2922 * @param rpcc the RPC controller 2923 * @param request the mutate request 2924 */ 2925 @Override 2926 public MutateResponse mutate(final RpcController rpcc, final MutateRequest request) 2927 throws ServiceException { 2928 // rpc controller is how we bring in data via the back door; it is unprotobuf'ed data. 2929 // It is also the conduit via which we pass back data. 2930 HBaseRpcController controller = (HBaseRpcController) rpcc; 2931 CellScanner cellScanner = controller != null ? controller.cellScanner() : null; 2932 OperationQuota quota = null; 2933 RpcCallContext context = RpcServer.getCurrentCall().orElse(null); 2934 // Clear scanner so we are not holding on to reference across call. 2935 if (controller != null) { 2936 controller.setCellScanner(null); 2937 } 2938 try { 2939 checkOpen(); 2940 requestCount.increment(); 2941 rpcMutateRequestCount.increment(); 2942 HRegion region = getRegion(request.getRegion()); 2943 rejectIfInStandByState(region); 2944 MutateResponse.Builder builder = MutateResponse.newBuilder(); 2945 MutationProto mutation = request.getMutation(); 2946 if (!region.getRegionInfo().isMetaRegion()) { 2947 server.getMemStoreFlusher().reclaimMemStoreMemory(); 2948 } 2949 long nonceGroup = request.hasNonceGroup() ? request.getNonceGroup() : HConstants.NO_NONCE; 2950 quota = getRpcQuotaManager().checkQuota(region, OperationQuota.OperationType.MUTATE); 2951 ActivePolicyEnforcement spaceQuotaEnforcement = 2952 getSpaceQuotaManager().getActiveEnforcements(); 2953 2954 if (request.hasCondition()) { 2955 CheckAndMutateResult result = checkAndMutate(region, quota, mutation, cellScanner, 2956 request.getCondition(), nonceGroup, spaceQuotaEnforcement); 2957 builder.setProcessed(result.isSuccess()); 2958 boolean clientCellBlockSupported = isClientCellBlockSupport(context); 2959 addResult(builder, result.getResult(), controller, clientCellBlockSupported); 2960 if (clientCellBlockSupported) { 2961 addSize(context, result.getResult(), null); 2962 } 2963 } else { 2964 Result r = null; 2965 Boolean processed = null; 2966 MutationType type = mutation.getMutateType(); 2967 switch (type) { 2968 case APPEND: 2969 // TODO: this doesn't actually check anything. 2970 r = append(region, quota, mutation, cellScanner, nonceGroup, spaceQuotaEnforcement); 2971 break; 2972 case INCREMENT: 2973 // TODO: this doesn't actually check anything. 2974 r = increment(region, quota, mutation, cellScanner, nonceGroup, spaceQuotaEnforcement); 2975 break; 2976 case PUT: 2977 put(region, quota, mutation, cellScanner, spaceQuotaEnforcement); 2978 processed = Boolean.TRUE; 2979 break; 2980 case DELETE: 2981 delete(region, quota, mutation, cellScanner, spaceQuotaEnforcement); 2982 processed = Boolean.TRUE; 2983 break; 2984 default: 2985 throw new DoNotRetryIOException("Unsupported mutate type: " + type.name()); 2986 } 2987 if (processed != null) { 2988 builder.setProcessed(processed); 2989 } 2990 boolean clientCellBlockSupported = isClientCellBlockSupport(context); 2991 addResult(builder, r, controller, clientCellBlockSupported); 2992 if (clientCellBlockSupported) { 2993 addSize(context, r, null); 2994 } 2995 } 2996 return builder.build(); 2997 } catch (IOException ie) { 2998 server.checkFileSystem(); 2999 throw new ServiceException(ie); 3000 } finally { 3001 if (quota != null) { 3002 quota.close(); 3003 } 3004 } 3005 } 3006 3007 private void put(HRegion region, OperationQuota quota, MutationProto mutation, 3008 CellScanner cellScanner, ActivePolicyEnforcement spaceQuota) throws IOException { 3009 long before = EnvironmentEdgeManager.currentTime(); 3010 Put put = ProtobufUtil.toPut(mutation, cellScanner); 3011 checkCellSizeLimit(region, put); 3012 spaceQuota.getPolicyEnforcement(region).check(put); 3013 quota.addMutation(put); 3014 region.put(put); 3015 3016 MetricsRegionServer metricsRegionServer = server.getMetrics(); 3017 if (metricsRegionServer != null) { 3018 long after = EnvironmentEdgeManager.currentTime(); 3019 metricsRegionServer.updatePut(region.getRegionInfo().getTable(), after - before); 3020 } 3021 } 3022 3023 private void delete(HRegion region, OperationQuota quota, MutationProto mutation, 3024 CellScanner cellScanner, ActivePolicyEnforcement spaceQuota) throws IOException { 3025 long before = EnvironmentEdgeManager.currentTime(); 3026 Delete delete = ProtobufUtil.toDelete(mutation, cellScanner); 3027 checkCellSizeLimit(region, delete); 3028 spaceQuota.getPolicyEnforcement(region).check(delete); 3029 quota.addMutation(delete); 3030 region.delete(delete); 3031 3032 MetricsRegionServer metricsRegionServer = server.getMetrics(); 3033 if (metricsRegionServer != null) { 3034 long after = EnvironmentEdgeManager.currentTime(); 3035 metricsRegionServer.updateDelete(region.getRegionInfo().getTable(), after - before); 3036 } 3037 } 3038 3039 private CheckAndMutateResult checkAndMutate(HRegion region, OperationQuota quota, 3040 MutationProto mutation, CellScanner cellScanner, Condition condition, long nonceGroup, 3041 ActivePolicyEnforcement spaceQuota) throws IOException { 3042 long before = EnvironmentEdgeManager.currentTime(); 3043 CheckAndMutate checkAndMutate = ProtobufUtil.toCheckAndMutate(condition, mutation, cellScanner); 3044 long nonce = mutation.hasNonce() ? mutation.getNonce() : HConstants.NO_NONCE; 3045 checkCellSizeLimit(region, (Mutation) checkAndMutate.getAction()); 3046 spaceQuota.getPolicyEnforcement(region).check((Mutation) checkAndMutate.getAction()); 3047 quota.addMutation((Mutation) checkAndMutate.getAction()); 3048 3049 CheckAndMutateResult result = null; 3050 if (region.getCoprocessorHost() != null) { 3051 result = region.getCoprocessorHost().preCheckAndMutate(checkAndMutate); 3052 } 3053 if (result == null) { 3054 result = region.checkAndMutate(checkAndMutate, nonceGroup, nonce); 3055 if (region.getCoprocessorHost() != null) { 3056 result = region.getCoprocessorHost().postCheckAndMutate(checkAndMutate, result); 3057 } 3058 } 3059 MetricsRegionServer metricsRegionServer = server.getMetrics(); 3060 if (metricsRegionServer != null) { 3061 long after = EnvironmentEdgeManager.currentTime(); 3062 metricsRegionServer.updateCheckAndMutate(region.getRegionInfo().getTable(), after - before); 3063 3064 MutationType type = mutation.getMutateType(); 3065 switch (type) { 3066 case PUT: 3067 metricsRegionServer.updateCheckAndPut(region.getRegionInfo().getTable(), after - before); 3068 break; 3069 case DELETE: 3070 metricsRegionServer.updateCheckAndDelete(region.getRegionInfo().getTable(), 3071 after - before); 3072 break; 3073 default: 3074 break; 3075 } 3076 } 3077 return result; 3078 } 3079 3080 // This is used to keep compatible with the old client implementation. Consider remove it if we 3081 // decide to drop the support of the client that still sends close request to a region scanner 3082 // which has already been exhausted. 3083 @Deprecated 3084 private static final IOException SCANNER_ALREADY_CLOSED = new IOException() { 3085 3086 private static final long serialVersionUID = -4305297078988180130L; 3087 3088 @Override 3089 public synchronized Throwable fillInStackTrace() { 3090 return this; 3091 } 3092 }; 3093 3094 private RegionScannerHolder getRegionScanner(ScanRequest request) throws IOException { 3095 String scannerName = toScannerName(request.getScannerId()); 3096 RegionScannerHolder rsh = this.scanners.get(scannerName); 3097 if (rsh == null) { 3098 // just ignore the next or close request if scanner does not exists. 3099 if (closedScanners.getIfPresent(scannerName) != null) { 3100 throw SCANNER_ALREADY_CLOSED; 3101 } else { 3102 LOG.warn("Client tried to access missing scanner " + scannerName); 3103 throw new UnknownScannerException( 3104 "Unknown scanner '" + scannerName + "'. This can happen due to any of the following " 3105 + "reasons: a) Scanner id given is wrong, b) Scanner lease expired because of " 3106 + "long wait between consecutive client checkins, c) Server may be closing down, " 3107 + "d) RegionServer restart during upgrade.\nIf the issue is due to reason (b), a " 3108 + "possible fix would be increasing the value of" 3109 + "'hbase.client.scanner.timeout.period' configuration."); 3110 } 3111 } 3112 rejectIfInStandByState(rsh.r); 3113 RegionInfo hri = rsh.s.getRegionInfo(); 3114 // Yes, should be the same instance 3115 if (server.getOnlineRegion(hri.getRegionName()) != rsh.r) { 3116 String msg = "Region has changed on the scanner " + scannerName + ": regionName=" 3117 + hri.getRegionNameAsString() + ", scannerRegionName=" + rsh.r; 3118 LOG.warn(msg + ", closing..."); 3119 scanners.remove(scannerName); 3120 try { 3121 rsh.s.close(); 3122 } catch (IOException e) { 3123 LOG.warn("Getting exception closing " + scannerName, e); 3124 } finally { 3125 try { 3126 server.getLeaseManager().cancelLease(scannerName); 3127 } catch (LeaseException e) { 3128 LOG.warn("Getting exception closing " + scannerName, e); 3129 } 3130 } 3131 throw new NotServingRegionException(msg); 3132 } 3133 return rsh; 3134 } 3135 3136 /** 3137 * @return Pair with scannerName key to use with this new Scanner and its RegionScannerHolder 3138 * value. 3139 */ 3140 private Pair<String, RegionScannerHolder> newRegionScanner(ScanRequest request, 3141 ScanResponse.Builder builder) throws IOException { 3142 HRegion region = getRegion(request.getRegion()); 3143 rejectIfInStandByState(region); 3144 ClientProtos.Scan protoScan = request.getScan(); 3145 boolean isLoadingCfsOnDemandSet = protoScan.hasLoadColumnFamiliesOnDemand(); 3146 Scan scan = ProtobufUtil.toScan(protoScan); 3147 // if the request doesn't set this, get the default region setting. 3148 if (!isLoadingCfsOnDemandSet) { 3149 scan.setLoadColumnFamiliesOnDemand(region.isLoadingCfsOnDemandDefault()); 3150 } 3151 3152 if (!scan.hasFamilies()) { 3153 // Adding all families to scanner 3154 for (byte[] family : region.getTableDescriptor().getColumnFamilyNames()) { 3155 scan.addFamily(family); 3156 } 3157 } 3158 if (region.getCoprocessorHost() != null) { 3159 // preScannerOpen is not allowed to return a RegionScanner. Only post hook can create a 3160 // wrapper for the core created RegionScanner 3161 region.getCoprocessorHost().preScannerOpen(scan); 3162 } 3163 RegionScannerImpl coreScanner = region.getScanner(scan); 3164 Shipper shipper = coreScanner; 3165 RegionScanner scanner = coreScanner; 3166 try { 3167 if (region.getCoprocessorHost() != null) { 3168 scanner = region.getCoprocessorHost().postScannerOpen(scan, scanner); 3169 } 3170 } catch (Exception e) { 3171 // Although region coprocessor is for advanced users and they should take care of the 3172 // implementation to not damage the HBase system, closing the scanner on exception here does 3173 // not have any bad side effect, so let's do it 3174 scanner.close(); 3175 throw e; 3176 } 3177 long scannerId = scannerIdGenerator.generateNewScannerId(); 3178 builder.setScannerId(scannerId); 3179 builder.setMvccReadPoint(scanner.getMvccReadPoint()); 3180 builder.setTtl(scannerLeaseTimeoutPeriod); 3181 String scannerName = toScannerName(scannerId); 3182 3183 boolean fullRegionScan = 3184 !region.getRegionInfo().getTable().isSystemTable() && isFullRegionScan(scan, region); 3185 3186 return new Pair<String, RegionScannerHolder>(scannerName, 3187 addScanner(scannerName, scanner, shipper, region, scan.isNeedCursorResult(), fullRegionScan)); 3188 } 3189 3190 /** 3191 * The returned String is used as key doing look up of outstanding Scanners in this Servers' 3192 * this.scanners, the Map of outstanding scanners and their current state. 3193 * @param scannerId A scanner long id. 3194 * @return The long id as a String. 3195 */ 3196 private static String toScannerName(long scannerId) { 3197 return Long.toString(scannerId); 3198 } 3199 3200 private void checkScanNextCallSeq(ScanRequest request, RegionScannerHolder rsh) 3201 throws OutOfOrderScannerNextException { 3202 // if nextCallSeq does not match throw Exception straight away. This needs to be 3203 // performed even before checking of Lease. 3204 // See HBASE-5974 3205 if (request.hasNextCallSeq()) { 3206 long callSeq = request.getNextCallSeq(); 3207 if (!rsh.incNextCallSeq(callSeq)) { 3208 throw new OutOfOrderScannerNextException( 3209 "Expected nextCallSeq: " + rsh.getNextCallSeq() + " But the nextCallSeq got from client: " 3210 + request.getNextCallSeq() + "; request=" + TextFormat.shortDebugString(request)); 3211 } 3212 } 3213 } 3214 3215 private void addScannerLeaseBack(LeaseManager.Lease lease) { 3216 try { 3217 server.getLeaseManager().addLease(lease); 3218 } catch (LeaseStillHeldException e) { 3219 // should not happen as the scanner id is unique. 3220 throw new AssertionError(e); 3221 } 3222 } 3223 3224 // visible for testing only 3225 long getTimeLimit(RpcCall rpcCall, HBaseRpcController controller, 3226 boolean allowHeartbeatMessages) { 3227 // Set the time limit to be half of the more restrictive timeout value (one of the 3228 // timeout values must be positive). In the event that both values are positive, the 3229 // more restrictive of the two is used to calculate the limit. 3230 if (allowHeartbeatMessages) { 3231 long now = EnvironmentEdgeManager.currentTime(); 3232 long remainingTimeout = getRemainingRpcTimeout(rpcCall, controller, now); 3233 if (scannerLeaseTimeoutPeriod > 0 || remainingTimeout > 0) { 3234 long timeLimitDelta; 3235 if (scannerLeaseTimeoutPeriod > 0 && remainingTimeout > 0) { 3236 timeLimitDelta = Math.min(scannerLeaseTimeoutPeriod, remainingTimeout); 3237 } else { 3238 timeLimitDelta = 3239 scannerLeaseTimeoutPeriod > 0 ? scannerLeaseTimeoutPeriod : remainingTimeout; 3240 } 3241 3242 // Use half of whichever timeout value was more restrictive... But don't allow 3243 // the time limit to be less than the allowable minimum (could cause an 3244 // immediate timeout before scanning any data). 3245 timeLimitDelta = Math.max(timeLimitDelta / 2, minimumScanTimeLimitDelta); 3246 return now + timeLimitDelta; 3247 } 3248 } 3249 // Default value of timeLimit is negative to indicate no timeLimit should be 3250 // enforced. 3251 return -1L; 3252 } 3253 3254 private long getRemainingRpcTimeout(RpcCall call, HBaseRpcController controller, long now) { 3255 long timeout; 3256 if (controller != null && controller.getCallTimeout() > 0) { 3257 timeout = controller.getCallTimeout(); 3258 } else if (rpcTimeout > 0) { 3259 timeout = rpcTimeout; 3260 } else { 3261 return -1; 3262 } 3263 if (call != null) { 3264 timeout -= (now - call.getReceiveTime()); 3265 } 3266 // getTimeLimit ignores values <= 0, but timeout may now be negative if queue time was high. 3267 // return minimum value here in that case so we count this in calculating the final delta. 3268 return Math.max(minimumScanTimeLimitDelta, timeout); 3269 } 3270 3271 private void checkLimitOfRows(int numOfCompleteRows, int limitOfRows, boolean moreRows, 3272 ScannerContext scannerContext, ScanResponse.Builder builder) { 3273 if (numOfCompleteRows >= limitOfRows) { 3274 if (LOG.isTraceEnabled()) { 3275 LOG.trace("Done scanning, limit of rows reached, moreRows: " + moreRows 3276 + " scannerContext: " + scannerContext); 3277 } 3278 builder.setMoreResults(false); 3279 } 3280 } 3281 3282 // return whether we have more results in region. 3283 private void scan(HBaseRpcController controller, ScanRequest request, RegionScannerHolder rsh, 3284 long maxQuotaResultSize, int maxResults, int limitOfRows, List<Result> results, 3285 ScanResponse.Builder builder, MutableObject<Object> lastBlock, RpcCall rpcCall) 3286 throws IOException { 3287 HRegion region = rsh.r; 3288 RegionScanner scanner = rsh.s; 3289 long maxResultSize; 3290 if (scanner.getMaxResultSize() > 0) { 3291 maxResultSize = Math.min(scanner.getMaxResultSize(), maxQuotaResultSize); 3292 } else { 3293 maxResultSize = maxQuotaResultSize; 3294 } 3295 // This is cells inside a row. Default size is 10 so if many versions or many cfs, 3296 // then we'll resize. Resizings show in profiler. Set it higher than 10. For now 3297 // arbitrary 32. TODO: keep record of general size of results being returned. 3298 ArrayList<Cell> values = new ArrayList<>(32); 3299 region.startRegionOperation(Operation.SCAN); 3300 long before = EnvironmentEdgeManager.currentTime(); 3301 // Used to check if we've matched the row limit set on the Scan 3302 int numOfCompleteRows = 0; 3303 // Count of times we call nextRaw; can be > numOfCompleteRows. 3304 int numOfNextRawCalls = 0; 3305 try { 3306 int numOfResults = 0; 3307 synchronized (scanner) { 3308 boolean stale = (region.getRegionInfo().getReplicaId() != 0); 3309 boolean clientHandlesPartials = 3310 request.hasClientHandlesPartials() && request.getClientHandlesPartials(); 3311 boolean clientHandlesHeartbeats = 3312 request.hasClientHandlesHeartbeats() && request.getClientHandlesHeartbeats(); 3313 3314 // On the server side we must ensure that the correct ordering of partial results is 3315 // returned to the client to allow them to properly reconstruct the partial results. 3316 // If the coprocessor host is adding to the result list, we cannot guarantee the 3317 // correct ordering of partial results and so we prevent partial results from being 3318 // formed. 3319 boolean serverGuaranteesOrderOfPartials = results.isEmpty(); 3320 boolean allowPartialResults = clientHandlesPartials && serverGuaranteesOrderOfPartials; 3321 boolean moreRows = false; 3322 3323 // Heartbeat messages occur when the processing of the ScanRequest is exceeds a 3324 // certain time threshold on the server. When the time threshold is exceeded, the 3325 // server stops the scan and sends back whatever Results it has accumulated within 3326 // that time period (may be empty). Since heartbeat messages have the potential to 3327 // create partial Results (in the event that the timeout occurs in the middle of a 3328 // row), we must only generate heartbeat messages when the client can handle both 3329 // heartbeats AND partials 3330 boolean allowHeartbeatMessages = clientHandlesHeartbeats && allowPartialResults; 3331 3332 long timeLimit = getTimeLimit(rpcCall, controller, allowHeartbeatMessages); 3333 3334 final LimitScope sizeScope = 3335 allowPartialResults ? LimitScope.BETWEEN_CELLS : LimitScope.BETWEEN_ROWS; 3336 final LimitScope timeScope = 3337 allowHeartbeatMessages ? LimitScope.BETWEEN_CELLS : LimitScope.BETWEEN_ROWS; 3338 3339 boolean trackMetrics = request.hasTrackScanMetrics() && request.getTrackScanMetrics(); 3340 3341 // Configure with limits for this RPC. Set keep progress true since size progress 3342 // towards size limit should be kept between calls to nextRaw 3343 ScannerContext.Builder contextBuilder = ScannerContext.newBuilder(true); 3344 // maxResultSize - either we can reach this much size for all cells(being read) data or sum 3345 // of heap size occupied by cells(being read). Cell data means its key and value parts. 3346 contextBuilder.setSizeLimit(sizeScope, maxResultSize, maxResultSize); 3347 contextBuilder.setBatchLimit(scanner.getBatch()); 3348 contextBuilder.setTimeLimit(timeScope, timeLimit); 3349 contextBuilder.setTrackMetrics(trackMetrics); 3350 ScannerContext scannerContext = contextBuilder.build(); 3351 boolean limitReached = false; 3352 while (numOfResults < maxResults) { 3353 // Reset the batch progress to 0 before every call to RegionScanner#nextRaw. The 3354 // batch limit is a limit on the number of cells per Result. Thus, if progress is 3355 // being tracked (i.e. scannerContext.keepProgress() is true) then we need to 3356 // reset the batch progress between nextRaw invocations since we don't want the 3357 // batch progress from previous calls to affect future calls 3358 scannerContext.setBatchProgress(0); 3359 assert values.isEmpty(); 3360 3361 // Collect values to be returned here 3362 moreRows = scanner.nextRaw(values, scannerContext); 3363 if (rpcCall == null) { 3364 // When there is no RpcCallContext,copy EC to heap, then the scanner would close, 3365 // This can be an EXPENSIVE call. It may make an extra copy from offheap to onheap 3366 // buffers.See more details in HBASE-26036. 3367 CellUtil.cloneIfNecessary(values); 3368 } 3369 numOfNextRawCalls++; 3370 3371 if (!values.isEmpty()) { 3372 if (limitOfRows > 0) { 3373 // First we need to check if the last result is partial and we have a row change. If 3374 // so then we need to increase the numOfCompleteRows. 3375 if (results.isEmpty()) { 3376 if ( 3377 rsh.rowOfLastPartialResult != null 3378 && !CellUtil.matchingRows(values.get(0), rsh.rowOfLastPartialResult) 3379 ) { 3380 numOfCompleteRows++; 3381 checkLimitOfRows(numOfCompleteRows, limitOfRows, moreRows, scannerContext, 3382 builder); 3383 } 3384 } else { 3385 Result lastResult = results.get(results.size() - 1); 3386 if ( 3387 lastResult.mayHaveMoreCellsInRow() 3388 && !CellUtil.matchingRows(values.get(0), lastResult.getRow()) 3389 ) { 3390 numOfCompleteRows++; 3391 checkLimitOfRows(numOfCompleteRows, limitOfRows, moreRows, scannerContext, 3392 builder); 3393 } 3394 } 3395 if (builder.hasMoreResults() && !builder.getMoreResults()) { 3396 break; 3397 } 3398 } 3399 boolean mayHaveMoreCellsInRow = scannerContext.mayHaveMoreCellsInRow(); 3400 Result r = Result.create(values, null, stale, mayHaveMoreCellsInRow); 3401 lastBlock.setValue(addSize(rpcCall, r, lastBlock.getValue())); 3402 results.add(r); 3403 numOfResults++; 3404 if (!mayHaveMoreCellsInRow && limitOfRows > 0) { 3405 numOfCompleteRows++; 3406 checkLimitOfRows(numOfCompleteRows, limitOfRows, moreRows, scannerContext, builder); 3407 if (builder.hasMoreResults() && !builder.getMoreResults()) { 3408 break; 3409 } 3410 } 3411 } else if (!moreRows && !results.isEmpty()) { 3412 // No more cells for the scan here, we need to ensure that the mayHaveMoreCellsInRow of 3413 // last result is false. Otherwise it's possible that: the first nextRaw returned 3414 // because BATCH_LIMIT_REACHED (BTW it happen to exhaust all cells of the scan),so the 3415 // last result's mayHaveMoreCellsInRow will be true. while the following nextRaw will 3416 // return with moreRows=false, which means moreResultsInRegion would be false, it will 3417 // be a contradictory state (HBASE-21206). 3418 int lastIdx = results.size() - 1; 3419 Result r = results.get(lastIdx); 3420 if (r.mayHaveMoreCellsInRow()) { 3421 results.set(lastIdx, Result.create(r.rawCells(), r.getExists(), r.isStale(), false)); 3422 } 3423 } 3424 boolean sizeLimitReached = scannerContext.checkSizeLimit(LimitScope.BETWEEN_ROWS); 3425 boolean timeLimitReached = scannerContext.checkTimeLimit(LimitScope.BETWEEN_ROWS); 3426 boolean resultsLimitReached = numOfResults >= maxResults; 3427 limitReached = sizeLimitReached || timeLimitReached || resultsLimitReached; 3428 3429 if (limitReached || !moreRows) { 3430 // We only want to mark a ScanResponse as a heartbeat message in the event that 3431 // there are more values to be read server side. If there aren't more values, 3432 // marking it as a heartbeat is wasteful because the client will need to issue 3433 // another ScanRequest only to realize that they already have all the values 3434 if (moreRows && timeLimitReached) { 3435 // Heartbeat messages occur when the time limit has been reached. 3436 builder.setHeartbeatMessage(true); 3437 if (rsh.needCursor) { 3438 Cell cursorCell = scannerContext.getLastPeekedCell(); 3439 if (cursorCell != null) { 3440 builder.setCursor(ProtobufUtil.toCursor(cursorCell)); 3441 } 3442 } 3443 } 3444 break; 3445 } 3446 values.clear(); 3447 } 3448 builder.setMoreResultsInRegion(moreRows); 3449 // Check to see if the client requested that we track metrics server side. If the 3450 // client requested metrics, retrieve the metrics from the scanner context. 3451 if (trackMetrics) { 3452 Map<String, Long> metrics = scannerContext.getMetrics().getMetricsMap(); 3453 ScanMetrics.Builder metricBuilder = ScanMetrics.newBuilder(); 3454 NameInt64Pair.Builder pairBuilder = NameInt64Pair.newBuilder(); 3455 3456 for (Entry<String, Long> entry : metrics.entrySet()) { 3457 pairBuilder.setName(entry.getKey()); 3458 pairBuilder.setValue(entry.getValue()); 3459 metricBuilder.addMetrics(pairBuilder.build()); 3460 } 3461 3462 builder.setScanMetrics(metricBuilder.build()); 3463 } 3464 } 3465 } finally { 3466 region.closeRegionOperation(); 3467 // Update serverside metrics, even on error. 3468 long end = EnvironmentEdgeManager.currentTime(); 3469 long responseCellSize = rpcCall != null ? rpcCall.getResponseCellSize() : 0; 3470 region.getMetrics().updateScanTime(end - before); 3471 final MetricsRegionServer metricsRegionServer = server.getMetrics(); 3472 if (metricsRegionServer != null) { 3473 metricsRegionServer.updateScanSize(region.getTableDescriptor().getTableName(), 3474 responseCellSize); 3475 metricsRegionServer.updateScanTime(region.getTableDescriptor().getTableName(), 3476 end - before); 3477 metricsRegionServer.updateReadQueryMeter(region.getRegionInfo().getTable(), 3478 numOfNextRawCalls); 3479 } 3480 } 3481 // coprocessor postNext hook 3482 if (region.getCoprocessorHost() != null) { 3483 region.getCoprocessorHost().postScannerNext(scanner, results, maxResults, true); 3484 } 3485 } 3486 3487 /** 3488 * Scan data in a table. 3489 * @param controller the RPC controller 3490 * @param request the scan request n 3491 */ 3492 @Override 3493 public ScanResponse scan(final RpcController controller, final ScanRequest request) 3494 throws ServiceException { 3495 if (controller != null && !(controller instanceof HBaseRpcController)) { 3496 throw new UnsupportedOperationException( 3497 "We only do " + "HBaseRpcControllers! FIX IF A PROBLEM: " + controller); 3498 } 3499 if (!request.hasScannerId() && !request.hasScan()) { 3500 throw new ServiceException( 3501 new DoNotRetryIOException("Missing required input: scannerId or scan")); 3502 } 3503 try { 3504 checkOpen(); 3505 } catch (IOException e) { 3506 if (request.hasScannerId()) { 3507 String scannerName = toScannerName(request.getScannerId()); 3508 if (LOG.isDebugEnabled()) { 3509 LOG.debug( 3510 "Server shutting down and client tried to access missing scanner " + scannerName); 3511 } 3512 final LeaseManager leaseManager = server.getLeaseManager(); 3513 if (leaseManager != null) { 3514 try { 3515 leaseManager.cancelLease(scannerName); 3516 } catch (LeaseException le) { 3517 // No problem, ignore 3518 if (LOG.isTraceEnabled()) { 3519 LOG.trace("Un-able to cancel lease of scanner. It could already be closed."); 3520 } 3521 } 3522 } 3523 } 3524 throw new ServiceException(e); 3525 } 3526 requestCount.increment(); 3527 rpcScanRequestCount.increment(); 3528 RegionScannerHolder rsh; 3529 ScanResponse.Builder builder = ScanResponse.newBuilder(); 3530 String scannerName; 3531 try { 3532 if (request.hasScannerId()) { 3533 // The downstream projects such as AsyncHBase in OpenTSDB need this value. See HBASE-18000 3534 // for more details. 3535 long scannerId = request.getScannerId(); 3536 builder.setScannerId(scannerId); 3537 scannerName = toScannerName(scannerId); 3538 rsh = getRegionScanner(request); 3539 } else { 3540 Pair<String, RegionScannerHolder> scannerNameAndRSH = newRegionScanner(request, builder); 3541 scannerName = scannerNameAndRSH.getFirst(); 3542 rsh = scannerNameAndRSH.getSecond(); 3543 } 3544 } catch (IOException e) { 3545 if (e == SCANNER_ALREADY_CLOSED) { 3546 // Now we will close scanner automatically if there are no more results for this region but 3547 // the old client will still send a close request to us. Just ignore it and return. 3548 return builder.build(); 3549 } 3550 throw new ServiceException(e); 3551 } 3552 if (rsh.fullRegionScan) { 3553 rpcFullScanRequestCount.increment(); 3554 } 3555 HRegion region = rsh.r; 3556 LeaseManager.Lease lease; 3557 try { 3558 // Remove lease while its being processed in server; protects against case 3559 // where processing of request takes > lease expiration time. or null if none found. 3560 lease = server.getLeaseManager().removeLease(scannerName); 3561 } catch (LeaseException e) { 3562 throw new ServiceException(e); 3563 } 3564 if (request.hasRenew() && request.getRenew()) { 3565 // add back and return 3566 addScannerLeaseBack(lease); 3567 try { 3568 checkScanNextCallSeq(request, rsh); 3569 } catch (OutOfOrderScannerNextException e) { 3570 throw new ServiceException(e); 3571 } 3572 return builder.build(); 3573 } 3574 OperationQuota quota; 3575 try { 3576 quota = getRpcQuotaManager().checkQuota(region, OperationQuota.OperationType.SCAN); 3577 } catch (IOException e) { 3578 addScannerLeaseBack(lease); 3579 throw new ServiceException(e); 3580 } 3581 try { 3582 checkScanNextCallSeq(request, rsh); 3583 } catch (OutOfOrderScannerNextException e) { 3584 addScannerLeaseBack(lease); 3585 throw new ServiceException(e); 3586 } 3587 // Now we have increased the next call sequence. If we give client an error, the retry will 3588 // never success. So we'd better close the scanner and return a DoNotRetryIOException to client 3589 // and then client will try to open a new scanner. 3590 boolean closeScanner = request.hasCloseScanner() ? request.getCloseScanner() : false; 3591 int rows; // this is scan.getCaching 3592 if (request.hasNumberOfRows()) { 3593 rows = request.getNumberOfRows(); 3594 } else { 3595 rows = closeScanner ? 0 : 1; 3596 } 3597 RpcCall rpcCall = RpcServer.getCurrentCall().orElse(null); 3598 // now let's do the real scan. 3599 long maxQuotaResultSize = Math.min(maxScannerResultSize, quota.getReadAvailable()); 3600 RegionScanner scanner = rsh.s; 3601 // this is the limit of rows for this scan, if we the number of rows reach this value, we will 3602 // close the scanner. 3603 int limitOfRows; 3604 if (request.hasLimitOfRows()) { 3605 limitOfRows = request.getLimitOfRows(); 3606 } else { 3607 limitOfRows = -1; 3608 } 3609 MutableObject<Object> lastBlock = new MutableObject<>(); 3610 boolean scannerClosed = false; 3611 try { 3612 List<Result> results = new ArrayList<>(Math.min(rows, 512)); 3613 if (rows > 0) { 3614 boolean done = false; 3615 // Call coprocessor. Get region info from scanner. 3616 if (region.getCoprocessorHost() != null) { 3617 Boolean bypass = region.getCoprocessorHost().preScannerNext(scanner, results, rows); 3618 if (!results.isEmpty()) { 3619 for (Result r : results) { 3620 lastBlock.setValue(addSize(rpcCall, r, lastBlock.getValue())); 3621 } 3622 } 3623 if (bypass != null && bypass.booleanValue()) { 3624 done = true; 3625 } 3626 } 3627 if (!done) { 3628 scan((HBaseRpcController) controller, request, rsh, maxQuotaResultSize, rows, limitOfRows, 3629 results, builder, lastBlock, rpcCall); 3630 } else { 3631 builder.setMoreResultsInRegion(!results.isEmpty()); 3632 } 3633 } else { 3634 // This is a open scanner call with numberOfRow = 0, so set more results in region to true. 3635 builder.setMoreResultsInRegion(true); 3636 } 3637 3638 quota.addScanResult(results); 3639 addResults(builder, results, (HBaseRpcController) controller, 3640 RegionReplicaUtil.isDefaultReplica(region.getRegionInfo()), 3641 isClientCellBlockSupport(rpcCall)); 3642 if (scanner.isFilterDone() && results.isEmpty()) { 3643 // If the scanner's filter - if any - is done with the scan 3644 // only set moreResults to false if the results is empty. This is used to keep compatible 3645 // with the old scan implementation where we just ignore the returned results if moreResults 3646 // is false. Can remove the isEmpty check after we get rid of the old implementation. 3647 builder.setMoreResults(false); 3648 } 3649 // Later we may close the scanner depending on this flag so here we need to make sure that we 3650 // have already set this flag. 3651 assert builder.hasMoreResultsInRegion(); 3652 // we only set moreResults to false in the above code, so set it to true if we haven't set it 3653 // yet. 3654 if (!builder.hasMoreResults()) { 3655 builder.setMoreResults(true); 3656 } 3657 if (builder.getMoreResults() && builder.getMoreResultsInRegion() && !results.isEmpty()) { 3658 // Record the last cell of the last result if it is a partial result 3659 // We need this to calculate the complete rows we have returned to client as the 3660 // mayHaveMoreCellsInRow is true does not mean that there will be extra cells for the 3661 // current row. We may filter out all the remaining cells for the current row and just 3662 // return the cells of the nextRow when calling RegionScanner.nextRaw. So here we need to 3663 // check for row change. 3664 Result lastResult = results.get(results.size() - 1); 3665 if (lastResult.mayHaveMoreCellsInRow()) { 3666 rsh.rowOfLastPartialResult = lastResult.getRow(); 3667 } else { 3668 rsh.rowOfLastPartialResult = null; 3669 } 3670 } 3671 if (!builder.getMoreResults() || !builder.getMoreResultsInRegion() || closeScanner) { 3672 scannerClosed = true; 3673 closeScanner(region, scanner, scannerName, rpcCall); 3674 } 3675 3676 // There's no point returning to a timed out client. Throwing ensures scanner is closed 3677 if (rpcCall != null && EnvironmentEdgeManager.currentTime() > rpcCall.getDeadline()) { 3678 throw new TimeoutIOException("Client deadline exceeded, cannot return results"); 3679 } 3680 3681 return builder.build(); 3682 } catch (IOException e) { 3683 try { 3684 // scanner is closed here 3685 scannerClosed = true; 3686 // The scanner state might be left in a dirty state, so we will tell the Client to 3687 // fail this RPC and close the scanner while opening up another one from the start of 3688 // row that the client has last seen. 3689 closeScanner(region, scanner, scannerName, rpcCall); 3690 3691 // If it is a DoNotRetryIOException already, throw as it is. Unfortunately, DNRIOE is 3692 // used in two different semantics. 3693 // (1) The first is to close the client scanner and bubble up the exception all the way 3694 // to the application. This is preferred when the exception is really un-recoverable 3695 // (like CorruptHFileException, etc). Plain DoNotRetryIOException also falls into this 3696 // bucket usually. 3697 // (2) Second semantics is to close the current region scanner only, but continue the 3698 // client scanner by overriding the exception. This is usually UnknownScannerException, 3699 // OutOfOrderScannerNextException, etc where the region scanner has to be closed, but the 3700 // application-level ClientScanner has to continue without bubbling up the exception to 3701 // the client. See ClientScanner code to see how it deals with these special exceptions. 3702 if (e instanceof DoNotRetryIOException) { 3703 throw e; 3704 } 3705 3706 // If it is a FileNotFoundException, wrap as a 3707 // DoNotRetryIOException. This can avoid the retry in ClientScanner. 3708 if (e instanceof FileNotFoundException) { 3709 throw new DoNotRetryIOException(e); 3710 } 3711 3712 // We closed the scanner already. Instead of throwing the IOException, and client 3713 // retrying with the same scannerId only to get USE on the next RPC, we directly throw 3714 // a special exception to save an RPC. 3715 if (VersionInfoUtil.hasMinimumVersion(rpcCall.getClientVersionInfo(), 1, 4)) { 3716 // 1.4.0+ clients know how to handle 3717 throw new ScannerResetException("Scanner is closed on the server-side", e); 3718 } else { 3719 // older clients do not know about SRE. Just throw USE, which they will handle 3720 throw new UnknownScannerException("Throwing UnknownScannerException to reset the client" 3721 + " scanner state for clients older than 1.3.", e); 3722 } 3723 } catch (IOException ioe) { 3724 throw new ServiceException(ioe); 3725 } 3726 } finally { 3727 if (!scannerClosed) { 3728 // Adding resets expiration time on lease. 3729 // the closeCallBack will be set in closeScanner so here we only care about shippedCallback 3730 if (rpcCall != null) { 3731 rpcCall.setCallBack(rsh.shippedCallback); 3732 } else { 3733 // If context is null,here we call rsh.shippedCallback directly to reuse the logic in 3734 // rsh.shippedCallback to release the internal resources in rsh,and lease is also added 3735 // back to regionserver's LeaseManager in rsh.shippedCallback. 3736 runShippedCallback(rsh); 3737 } 3738 } 3739 quota.close(); 3740 } 3741 } 3742 3743 private void runShippedCallback(RegionScannerHolder rsh) throws ServiceException { 3744 assert rsh.shippedCallback != null; 3745 try { 3746 rsh.shippedCallback.run(); 3747 } catch (IOException ioe) { 3748 throw new ServiceException(ioe); 3749 } 3750 } 3751 3752 private void closeScanner(HRegion region, RegionScanner scanner, String scannerName, 3753 RpcCallContext context) throws IOException { 3754 if (region.getCoprocessorHost() != null) { 3755 if (region.getCoprocessorHost().preScannerClose(scanner)) { 3756 // bypass the actual close. 3757 return; 3758 } 3759 } 3760 RegionScannerHolder rsh = scanners.remove(scannerName); 3761 if (rsh != null) { 3762 if (context != null) { 3763 context.setCallBack(rsh.closeCallBack); 3764 } else { 3765 rsh.s.close(); 3766 } 3767 if (region.getCoprocessorHost() != null) { 3768 region.getCoprocessorHost().postScannerClose(scanner); 3769 } 3770 closedScanners.put(scannerName, scannerName); 3771 } 3772 } 3773 3774 @Override 3775 public CoprocessorServiceResponse execRegionServerService(RpcController controller, 3776 CoprocessorServiceRequest request) throws ServiceException { 3777 rpcPreCheck("execRegionServerService"); 3778 return server.execRegionServerService(controller, request); 3779 } 3780 3781 @Override 3782 public GetSpaceQuotaSnapshotsResponse getSpaceQuotaSnapshots(RpcController controller, 3783 GetSpaceQuotaSnapshotsRequest request) throws ServiceException { 3784 try { 3785 final RegionServerSpaceQuotaManager manager = server.getRegionServerSpaceQuotaManager(); 3786 final GetSpaceQuotaSnapshotsResponse.Builder builder = 3787 GetSpaceQuotaSnapshotsResponse.newBuilder(); 3788 if (manager != null) { 3789 final Map<TableName, SpaceQuotaSnapshot> snapshots = manager.copyQuotaSnapshots(); 3790 for (Entry<TableName, SpaceQuotaSnapshot> snapshot : snapshots.entrySet()) { 3791 builder.addSnapshots(TableQuotaSnapshot.newBuilder() 3792 .setTableName(ProtobufUtil.toProtoTableName(snapshot.getKey())) 3793 .setSnapshot(SpaceQuotaSnapshot.toProtoSnapshot(snapshot.getValue())).build()); 3794 } 3795 } 3796 return builder.build(); 3797 } catch (Exception e) { 3798 throw new ServiceException(e); 3799 } 3800 } 3801 3802 @Override 3803 public ClearRegionBlockCacheResponse clearRegionBlockCache(RpcController controller, 3804 ClearRegionBlockCacheRequest request) throws ServiceException { 3805 rpcPreCheck("clearRegionBlockCache"); 3806 ClearRegionBlockCacheResponse.Builder builder = ClearRegionBlockCacheResponse.newBuilder(); 3807 CacheEvictionStatsBuilder stats = CacheEvictionStats.builder(); 3808 List<HRegion> regions = getRegions(request.getRegionList(), stats); 3809 for (HRegion region : regions) { 3810 try { 3811 stats = stats.append(this.server.clearRegionBlockCache(region)); 3812 } catch (Exception e) { 3813 stats.addException(region.getRegionInfo().getRegionName(), e); 3814 } 3815 } 3816 stats.withMaxCacheSize(server.getBlockCache().map(BlockCache::getMaxSize).orElse(0L)); 3817 return builder.setStats(ProtobufUtil.toCacheEvictionStats(stats.build())).build(); 3818 } 3819 3820 private void executeOpenRegionProcedures(OpenRegionRequest request, 3821 Map<TableName, TableDescriptor> tdCache) { 3822 long masterSystemTime = request.hasMasterSystemTime() ? request.getMasterSystemTime() : -1; 3823 for (RegionOpenInfo regionOpenInfo : request.getOpenInfoList()) { 3824 RegionInfo regionInfo = ProtobufUtil.toRegionInfo(regionOpenInfo.getRegion()); 3825 TableName tableName = regionInfo.getTable(); 3826 TableDescriptor tableDesc = tdCache.get(tableName); 3827 if (tableDesc == null) { 3828 try { 3829 tableDesc = server.getTableDescriptors().get(regionInfo.getTable()); 3830 } catch (IOException e) { 3831 // Here we do not fail the whole method since we also need deal with other 3832 // procedures, and we can not ignore this one, so we still schedule a 3833 // AssignRegionHandler and it will report back to master if we still can not get the 3834 // TableDescriptor. 3835 LOG.warn("Failed to get TableDescriptor of {}, will try again in the handler", 3836 regionInfo.getTable(), e); 3837 } 3838 if (tableDesc != null) { 3839 tdCache.put(tableName, tableDesc); 3840 } 3841 } 3842 if (regionOpenInfo.getFavoredNodesCount() > 0) { 3843 server.updateRegionFavoredNodesMapping(regionInfo.getEncodedName(), 3844 regionOpenInfo.getFavoredNodesList()); 3845 } 3846 long procId = regionOpenInfo.getOpenProcId(); 3847 if (server.submitRegionProcedure(procId)) { 3848 server.getExecutorService().submit( 3849 AssignRegionHandler.create(server, regionInfo, procId, tableDesc, masterSystemTime)); 3850 } 3851 } 3852 } 3853 3854 private void executeCloseRegionProcedures(CloseRegionRequest request) { 3855 String encodedName; 3856 try { 3857 encodedName = ProtobufUtil.getRegionEncodedName(request.getRegion()); 3858 } catch (DoNotRetryIOException e) { 3859 throw new UncheckedIOException("Should not happen", e); 3860 } 3861 ServerName destination = request.hasDestinationServer() 3862 ? ProtobufUtil.toServerName(request.getDestinationServer()) 3863 : null; 3864 long procId = request.getCloseProcId(); 3865 if (server.submitRegionProcedure(procId)) { 3866 server.getExecutorService() 3867 .submit(UnassignRegionHandler.create(server, encodedName, procId, false, destination)); 3868 } 3869 } 3870 3871 private void executeProcedures(RemoteProcedureRequest request) { 3872 RSProcedureCallable callable; 3873 try { 3874 callable = Class.forName(request.getProcClass()).asSubclass(RSProcedureCallable.class) 3875 .getDeclaredConstructor().newInstance(); 3876 } catch (Exception e) { 3877 LOG.warn("Failed to instantiating remote procedure {}, pid={}", request.getProcClass(), 3878 request.getProcId(), e); 3879 server.remoteProcedureComplete(request.getProcId(), e); 3880 return; 3881 } 3882 callable.init(request.getProcData().toByteArray(), server); 3883 LOG.debug("Executing remote procedure {}, pid={}", callable.getClass(), request.getProcId()); 3884 server.executeProcedure(request.getProcId(), callable); 3885 } 3886 3887 @Override 3888 @QosPriority(priority = HConstants.ADMIN_QOS) 3889 public ExecuteProceduresResponse executeProcedures(RpcController controller, 3890 ExecuteProceduresRequest request) throws ServiceException { 3891 try { 3892 checkOpen(); 3893 throwOnWrongStartCode(request); 3894 server.getRegionServerCoprocessorHost().preExecuteProcedures(); 3895 if (request.getOpenRegionCount() > 0) { 3896 // Avoid reading from the TableDescritor every time(usually it will read from the file 3897 // system) 3898 Map<TableName, TableDescriptor> tdCache = new HashMap<>(); 3899 request.getOpenRegionList().forEach(req -> executeOpenRegionProcedures(req, tdCache)); 3900 } 3901 if (request.getCloseRegionCount() > 0) { 3902 request.getCloseRegionList().forEach(this::executeCloseRegionProcedures); 3903 } 3904 if (request.getProcCount() > 0) { 3905 request.getProcList().forEach(this::executeProcedures); 3906 } 3907 server.getRegionServerCoprocessorHost().postExecuteProcedures(); 3908 return ExecuteProceduresResponse.getDefaultInstance(); 3909 } catch (IOException e) { 3910 throw new ServiceException(e); 3911 } 3912 } 3913 3914 @Override 3915 public GetAllBootstrapNodesResponse getAllBootstrapNodes(RpcController controller, 3916 GetAllBootstrapNodesRequest request) throws ServiceException { 3917 GetAllBootstrapNodesResponse.Builder builder = GetAllBootstrapNodesResponse.newBuilder(); 3918 server.getBootstrapNodes() 3919 .forEachRemaining(server -> builder.addNode(ProtobufUtil.toServerName(server))); 3920 return builder.build(); 3921 } 3922}