001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019 020import com.google.errorprone.annotations.RestrictedApi; 021import java.io.FileNotFoundException; 022import java.io.IOException; 023import java.io.UncheckedIOException; 024import java.net.BindException; 025import java.net.InetAddress; 026import java.net.InetSocketAddress; 027import java.nio.ByteBuffer; 028import java.util.ArrayList; 029import java.util.Arrays; 030import java.util.Collections; 031import java.util.HashMap; 032import java.util.Iterator; 033import java.util.List; 034import java.util.Map; 035import java.util.Map.Entry; 036import java.util.NavigableMap; 037import java.util.Set; 038import java.util.TreeSet; 039import java.util.concurrent.ConcurrentHashMap; 040import java.util.concurrent.ConcurrentMap; 041import java.util.concurrent.TimeUnit; 042import java.util.concurrent.atomic.AtomicBoolean; 043import java.util.concurrent.atomic.AtomicLong; 044import java.util.concurrent.atomic.LongAdder; 045import org.apache.commons.lang3.mutable.MutableObject; 046import org.apache.hadoop.conf.Configuration; 047import org.apache.hadoop.fs.FileSystem; 048import org.apache.hadoop.fs.Path; 049import org.apache.hadoop.hbase.ByteBufferExtendedCell; 050import org.apache.hadoop.hbase.CacheEvictionStats; 051import org.apache.hadoop.hbase.CacheEvictionStatsBuilder; 052import org.apache.hadoop.hbase.Cell; 053import org.apache.hadoop.hbase.CellScannable; 054import org.apache.hadoop.hbase.CellScanner; 055import org.apache.hadoop.hbase.CellUtil; 056import org.apache.hadoop.hbase.DoNotRetryIOException; 057import org.apache.hadoop.hbase.DroppedSnapshotException; 058import org.apache.hadoop.hbase.HBaseIOException; 059import org.apache.hadoop.hbase.HBaseRpcServicesBase; 060import org.apache.hadoop.hbase.HConstants; 061import org.apache.hadoop.hbase.MultiActionResultTooLarge; 062import org.apache.hadoop.hbase.NotServingRegionException; 063import org.apache.hadoop.hbase.PrivateCellUtil; 064import org.apache.hadoop.hbase.RegionTooBusyException; 065import org.apache.hadoop.hbase.Server; 066import org.apache.hadoop.hbase.ServerName; 067import org.apache.hadoop.hbase.TableName; 068import org.apache.hadoop.hbase.UnknownScannerException; 069import org.apache.hadoop.hbase.client.Append; 070import org.apache.hadoop.hbase.client.CheckAndMutate; 071import org.apache.hadoop.hbase.client.CheckAndMutateResult; 072import org.apache.hadoop.hbase.client.Delete; 073import org.apache.hadoop.hbase.client.Durability; 074import org.apache.hadoop.hbase.client.Get; 075import org.apache.hadoop.hbase.client.Increment; 076import org.apache.hadoop.hbase.client.Mutation; 077import org.apache.hadoop.hbase.client.OperationWithAttributes; 078import org.apache.hadoop.hbase.client.Put; 079import org.apache.hadoop.hbase.client.RegionInfo; 080import org.apache.hadoop.hbase.client.RegionReplicaUtil; 081import org.apache.hadoop.hbase.client.Result; 082import org.apache.hadoop.hbase.client.Row; 083import org.apache.hadoop.hbase.client.Scan; 084import org.apache.hadoop.hbase.client.TableDescriptor; 085import org.apache.hadoop.hbase.client.VersionInfoUtil; 086import org.apache.hadoop.hbase.exceptions.FailedSanityCheckException; 087import org.apache.hadoop.hbase.exceptions.OutOfOrderScannerNextException; 088import org.apache.hadoop.hbase.exceptions.ScannerResetException; 089import org.apache.hadoop.hbase.exceptions.UnknownProtocolException; 090import org.apache.hadoop.hbase.io.ByteBuffAllocator; 091import org.apache.hadoop.hbase.io.hfile.BlockCache; 092import org.apache.hadoop.hbase.ipc.HBaseRpcController; 093import org.apache.hadoop.hbase.ipc.PriorityFunction; 094import org.apache.hadoop.hbase.ipc.QosPriority; 095import org.apache.hadoop.hbase.ipc.RpcCall; 096import org.apache.hadoop.hbase.ipc.RpcCallContext; 097import org.apache.hadoop.hbase.ipc.RpcCallback; 098import org.apache.hadoop.hbase.ipc.RpcServer; 099import org.apache.hadoop.hbase.ipc.RpcServer.BlockingServiceAndInterface; 100import org.apache.hadoop.hbase.ipc.RpcServerFactory; 101import org.apache.hadoop.hbase.ipc.RpcServerInterface; 102import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException; 103import org.apache.hadoop.hbase.ipc.ServerRpcController; 104import org.apache.hadoop.hbase.net.Address; 105import org.apache.hadoop.hbase.procedure2.RSProcedureCallable; 106import org.apache.hadoop.hbase.quotas.ActivePolicyEnforcement; 107import org.apache.hadoop.hbase.quotas.OperationQuota; 108import org.apache.hadoop.hbase.quotas.QuotaUtil; 109import org.apache.hadoop.hbase.quotas.RegionServerRpcQuotaManager; 110import org.apache.hadoop.hbase.quotas.RegionServerSpaceQuotaManager; 111import org.apache.hadoop.hbase.quotas.SpaceQuotaSnapshot; 112import org.apache.hadoop.hbase.quotas.SpaceViolationPolicyEnforcement; 113import org.apache.hadoop.hbase.regionserver.LeaseManager.Lease; 114import org.apache.hadoop.hbase.regionserver.LeaseManager.LeaseStillHeldException; 115import org.apache.hadoop.hbase.regionserver.Region.Operation; 116import org.apache.hadoop.hbase.regionserver.ScannerContext.LimitScope; 117import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker; 118import org.apache.hadoop.hbase.regionserver.handler.AssignRegionHandler; 119import org.apache.hadoop.hbase.regionserver.handler.OpenMetaHandler; 120import org.apache.hadoop.hbase.regionserver.handler.OpenPriorityRegionHandler; 121import org.apache.hadoop.hbase.regionserver.handler.OpenRegionHandler; 122import org.apache.hadoop.hbase.regionserver.handler.UnassignRegionHandler; 123import org.apache.hadoop.hbase.replication.ReplicationUtils; 124import org.apache.hadoop.hbase.replication.regionserver.RejectReplicationRequestStateChecker; 125import org.apache.hadoop.hbase.replication.regionserver.RejectRequestsFromClientStateChecker; 126import org.apache.hadoop.hbase.security.Superusers; 127import org.apache.hadoop.hbase.security.access.Permission; 128import org.apache.hadoop.hbase.util.Bytes; 129import org.apache.hadoop.hbase.util.DNS; 130import org.apache.hadoop.hbase.util.DNS.ServerType; 131import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 132import org.apache.hadoop.hbase.util.Pair; 133import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil; 134import org.apache.hadoop.hbase.wal.WAL; 135import org.apache.hadoop.hbase.wal.WALEdit; 136import org.apache.hadoop.hbase.wal.WALKey; 137import org.apache.hadoop.hbase.wal.WALSplitUtil; 138import org.apache.hadoop.hbase.wal.WALSplitUtil.MutationReplay; 139import org.apache.hadoop.hbase.zookeeper.ZKWatcher; 140import org.apache.yetus.audience.InterfaceAudience; 141import org.slf4j.Logger; 142import org.slf4j.LoggerFactory; 143 144import org.apache.hbase.thirdparty.com.google.common.cache.Cache; 145import org.apache.hbase.thirdparty.com.google.common.cache.CacheBuilder; 146import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList; 147import org.apache.hbase.thirdparty.com.google.common.collect.Lists; 148import org.apache.hbase.thirdparty.com.google.protobuf.ByteString; 149import org.apache.hbase.thirdparty.com.google.protobuf.Message; 150import org.apache.hbase.thirdparty.com.google.protobuf.RpcController; 151import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException; 152import org.apache.hbase.thirdparty.com.google.protobuf.TextFormat; 153import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations; 154import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils; 155 156import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 157import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter; 158import org.apache.hadoop.hbase.shaded.protobuf.ResponseConverter; 159import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService; 160import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearCompactionQueuesRequest; 161import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearCompactionQueuesResponse; 162import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearRegionBlockCacheRequest; 163import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearRegionBlockCacheResponse; 164import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CloseRegionRequest; 165import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CloseRegionResponse; 166import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactRegionRequest; 167import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactRegionResponse; 168import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactionSwitchRequest; 169import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactionSwitchResponse; 170import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ExecuteProceduresRequest; 171import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ExecuteProceduresResponse; 172import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionRequest; 173import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionResponse; 174import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetOnlineRegionRequest; 175import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetOnlineRegionResponse; 176import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoRequest; 177import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoResponse; 178import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionLoadRequest; 179import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionLoadResponse; 180import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetServerInfoRequest; 181import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetServerInfoResponse; 182import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetStoreFileRequest; 183import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetStoreFileResponse; 184import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionRequest; 185import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionRequest.RegionOpenInfo; 186import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionResponse; 187import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionResponse.RegionOpeningState; 188import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.RemoteProcedureRequest; 189import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ReplicateWALEntryRequest; 190import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ReplicateWALEntryResponse; 191import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.RollWALWriterRequest; 192import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.RollWALWriterResponse; 193import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.StopServerRequest; 194import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.StopServerResponse; 195import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateFavoredNodesRequest; 196import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateFavoredNodesResponse; 197import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WALEntry; 198import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WarmupRegionRequest; 199import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WarmupRegionResponse; 200import org.apache.hadoop.hbase.shaded.protobuf.generated.BootstrapNodeProtos.BootstrapNodeService; 201import org.apache.hadoop.hbase.shaded.protobuf.generated.BootstrapNodeProtos.GetAllBootstrapNodesRequest; 202import org.apache.hadoop.hbase.shaded.protobuf.generated.BootstrapNodeProtos.GetAllBootstrapNodesResponse; 203import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos; 204import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.Action; 205import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.BulkLoadHFileRequest; 206import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.BulkLoadHFileRequest.FamilyPath; 207import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.BulkLoadHFileResponse; 208import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CleanupBulkLoadRequest; 209import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CleanupBulkLoadResponse; 210import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService; 211import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.Condition; 212import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceRequest; 213import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceResponse; 214import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.GetRequest; 215import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.GetResponse; 216import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MultiRegionLoadStats; 217import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MultiRequest; 218import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MultiResponse; 219import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateRequest; 220import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateResponse; 221import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutationProto; 222import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutationProto.MutationType; 223import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.PrepareBulkLoadRequest; 224import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.PrepareBulkLoadResponse; 225import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.RegionAction; 226import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.RegionActionResult; 227import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ResultOrException; 228import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanRequest; 229import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanResponse; 230import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos; 231import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos.RegionLoad; 232import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.NameBytesPair; 233import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.NameInt64Pair; 234import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier; 235import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType; 236import org.apache.hadoop.hbase.shaded.protobuf.generated.MapReduceProtos.ScanMetrics; 237import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsRequest; 238import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsResponse; 239import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsResponse.TableQuotaSnapshot; 240import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.ClientMetaService; 241import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.BulkLoadDescriptor; 242import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.CompactionDescriptor; 243import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.FlushDescriptor; 244import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.RegionEventDescriptor; 245 246/** 247 * Implements the regionserver RPC services. 248 */ 249@InterfaceAudience.Private 250public class RSRpcServices extends HBaseRpcServicesBase<HRegionServer> 251 implements ClientService.BlockingInterface, BootstrapNodeService.BlockingInterface { 252 253 private static final Logger LOG = LoggerFactory.getLogger(RSRpcServices.class); 254 255 /** RPC scheduler to use for the region server. */ 256 public static final String REGION_SERVER_RPC_SCHEDULER_FACTORY_CLASS = 257 "hbase.region.server.rpc.scheduler.factory.class"; 258 259 /** 260 * Minimum allowable time limit delta (in milliseconds) that can be enforced during scans. This 261 * configuration exists to prevent the scenario where a time limit is specified to be so 262 * restrictive that the time limit is reached immediately (before any cells are scanned). 263 */ 264 private static final String REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA = 265 "hbase.region.server.rpc.minimum.scan.time.limit.delta"; 266 /** 267 * Default value of {@link RSRpcServices#REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA} 268 */ 269 private static final long DEFAULT_REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA = 10; 270 271 /** 272 * Whether to reject rows with size > threshold defined by 273 * {@link HConstants#BATCH_ROWS_THRESHOLD_NAME} 274 */ 275 private static final String REJECT_BATCH_ROWS_OVER_THRESHOLD = 276 "hbase.rpc.rows.size.threshold.reject"; 277 278 /** 279 * Default value of config {@link RSRpcServices#REJECT_BATCH_ROWS_OVER_THRESHOLD} 280 */ 281 private static final boolean DEFAULT_REJECT_BATCH_ROWS_OVER_THRESHOLD = false; 282 283 // Request counter. (Includes requests that are not serviced by regions.) 284 // Count only once for requests with multiple actions like multi/caching-scan/replayBatch 285 final LongAdder requestCount = new LongAdder(); 286 287 // Request counter for rpc get 288 final LongAdder rpcGetRequestCount = new LongAdder(); 289 290 // Request counter for rpc scan 291 final LongAdder rpcScanRequestCount = new LongAdder(); 292 293 // Request counter for scans that might end up in full scans 294 final LongAdder rpcFullScanRequestCount = new LongAdder(); 295 296 // Request counter for rpc multi 297 final LongAdder rpcMultiRequestCount = new LongAdder(); 298 299 // Request counter for rpc mutate 300 final LongAdder rpcMutateRequestCount = new LongAdder(); 301 302 private final long maxScannerResultSize; 303 304 private ScannerIdGenerator scannerIdGenerator; 305 private final ConcurrentMap<String, RegionScannerHolder> scanners = new ConcurrentHashMap<>(); 306 // Hold the name of a closed scanner for a while. This is used to keep compatible for old clients 307 // which may send next or close request to a region scanner which has already been exhausted. The 308 // entries will be removed automatically after scannerLeaseTimeoutPeriod. 309 private final Cache<String, String> closedScanners; 310 /** 311 * The lease timeout period for client scanners (milliseconds). 312 */ 313 private final int scannerLeaseTimeoutPeriod; 314 315 /** 316 * The RPC timeout period (milliseconds) 317 */ 318 private final int rpcTimeout; 319 320 /** 321 * The minimum allowable delta to use for the scan limit 322 */ 323 private final long minimumScanTimeLimitDelta; 324 325 /** 326 * Row size threshold for multi requests above which a warning is logged 327 */ 328 private final int rowSizeWarnThreshold; 329 /* 330 * Whether we should reject requests with very high no of rows i.e. beyond threshold defined by 331 * rowSizeWarnThreshold 332 */ 333 private final boolean rejectRowsWithSizeOverThreshold; 334 335 final AtomicBoolean clearCompactionQueues = new AtomicBoolean(false); 336 337 /** 338 * Services launched in RSRpcServices. By default they are on but you can use the below booleans 339 * to selectively enable/disable either Admin or Client Service (Rare is the case where you would 340 * ever turn off one or the other). 341 */ 342 public static final String REGIONSERVER_ADMIN_SERVICE_CONFIG = 343 "hbase.regionserver.admin.executorService"; 344 public static final String REGIONSERVER_CLIENT_SERVICE_CONFIG = 345 "hbase.regionserver.client.executorService"; 346 public static final String REGIONSERVER_CLIENT_META_SERVICE_CONFIG = 347 "hbase.regionserver.client.meta.executorService"; 348 349 /** 350 * An Rpc callback for closing a RegionScanner. 351 */ 352 private static final class RegionScannerCloseCallBack implements RpcCallback { 353 354 private final RegionScanner scanner; 355 356 public RegionScannerCloseCallBack(RegionScanner scanner) { 357 this.scanner = scanner; 358 } 359 360 @Override 361 public void run() throws IOException { 362 this.scanner.close(); 363 } 364 } 365 366 /** 367 * An Rpc callback for doing shipped() call on a RegionScanner. 368 */ 369 private class RegionScannerShippedCallBack implements RpcCallback { 370 private final String scannerName; 371 private final Shipper shipper; 372 private final Lease lease; 373 374 public RegionScannerShippedCallBack(String scannerName, Shipper shipper, Lease lease) { 375 this.scannerName = scannerName; 376 this.shipper = shipper; 377 this.lease = lease; 378 } 379 380 @Override 381 public void run() throws IOException { 382 this.shipper.shipped(); 383 // We're done. On way out re-add the above removed lease. The lease was temp removed for this 384 // Rpc call and we are at end of the call now. Time to add it back. 385 if (scanners.containsKey(scannerName)) { 386 if (lease != null) { 387 server.getLeaseManager().addLease(lease); 388 } 389 } 390 } 391 } 392 393 /** 394 * An RpcCallBack that creates a list of scanners that needs to perform callBack operation on 395 * completion of multiGets. 396 */ 397 static class RegionScannersCloseCallBack implements RpcCallback { 398 private final List<RegionScanner> scanners = new ArrayList<>(); 399 400 public void addScanner(RegionScanner scanner) { 401 this.scanners.add(scanner); 402 } 403 404 @Override 405 public void run() { 406 for (RegionScanner scanner : scanners) { 407 try { 408 scanner.close(); 409 } catch (IOException e) { 410 LOG.error("Exception while closing the scanner " + scanner, e); 411 } 412 } 413 } 414 } 415 416 /** 417 * Holder class which holds the RegionScanner, nextCallSeq and RpcCallbacks together. 418 */ 419 static final class RegionScannerHolder { 420 private final AtomicLong nextCallSeq = new AtomicLong(0); 421 private final RegionScanner s; 422 private final HRegion r; 423 private final RpcCallback closeCallBack; 424 private final RpcCallback shippedCallback; 425 private byte[] rowOfLastPartialResult; 426 private boolean needCursor; 427 private boolean fullRegionScan; 428 private final String clientIPAndPort; 429 private final String userName; 430 431 RegionScannerHolder(RegionScanner s, HRegion r, RpcCallback closeCallBack, 432 RpcCallback shippedCallback, boolean needCursor, boolean fullRegionScan, 433 String clientIPAndPort, String userName) { 434 this.s = s; 435 this.r = r; 436 this.closeCallBack = closeCallBack; 437 this.shippedCallback = shippedCallback; 438 this.needCursor = needCursor; 439 this.fullRegionScan = fullRegionScan; 440 this.clientIPAndPort = clientIPAndPort; 441 this.userName = userName; 442 } 443 444 long getNextCallSeq() { 445 return nextCallSeq.get(); 446 } 447 448 boolean incNextCallSeq(long currentSeq) { 449 // Use CAS to prevent multiple scan request running on the same scanner. 450 return nextCallSeq.compareAndSet(currentSeq, currentSeq + 1); 451 } 452 453 // Should be called only when we need to print lease expired messages otherwise 454 // cache the String once made. 455 @Override 456 public String toString() { 457 return "clientIPAndPort=" + this.clientIPAndPort + ", userName=" + this.userName 458 + ", regionInfo=" + this.r.getRegionInfo().getRegionNameAsString(); 459 } 460 } 461 462 /** 463 * Instantiated as a scanner lease. If the lease times out, the scanner is closed 464 */ 465 private class ScannerListener implements LeaseListener { 466 private final String scannerName; 467 468 ScannerListener(final String n) { 469 this.scannerName = n; 470 } 471 472 @Override 473 public void leaseExpired() { 474 RegionScannerHolder rsh = scanners.remove(this.scannerName); 475 if (rsh == null) { 476 LOG.warn("Scanner lease {} expired but no outstanding scanner", this.scannerName); 477 return; 478 } 479 LOG.info("Scanner lease {} expired {}", this.scannerName, rsh); 480 server.getMetrics().incrScannerLeaseExpired(); 481 RegionScanner s = rsh.s; 482 HRegion region = null; 483 try { 484 region = server.getRegion(s.getRegionInfo().getRegionName()); 485 if (region != null && region.getCoprocessorHost() != null) { 486 region.getCoprocessorHost().preScannerClose(s); 487 } 488 } catch (IOException e) { 489 LOG.error("Closing scanner {} {}", this.scannerName, rsh, e); 490 } finally { 491 try { 492 s.close(); 493 if (region != null && region.getCoprocessorHost() != null) { 494 region.getCoprocessorHost().postScannerClose(s); 495 } 496 } catch (IOException e) { 497 LOG.error("Closing scanner {} {}", this.scannerName, rsh, e); 498 } 499 } 500 } 501 } 502 503 private static ResultOrException getResultOrException(final ClientProtos.Result r, 504 final int index) { 505 return getResultOrException(ResponseConverter.buildActionResult(r), index); 506 } 507 508 private static ResultOrException getResultOrException(final Exception e, final int index) { 509 return getResultOrException(ResponseConverter.buildActionResult(e), index); 510 } 511 512 private static ResultOrException getResultOrException(final ResultOrException.Builder builder, 513 final int index) { 514 return builder.setIndex(index).build(); 515 } 516 517 /** 518 * Checks for the following pre-checks in order: 519 * <ol> 520 * <li>RegionServer is running</li> 521 * <li>If authorization is enabled, then RPC caller has ADMIN permissions</li> 522 * </ol> 523 * @param requestName name of rpc request. Used in reporting failures to provide context. 524 * @throws ServiceException If any of the above listed pre-check fails. 525 */ 526 private void rpcPreCheck(String requestName) throws ServiceException { 527 try { 528 checkOpen(); 529 requirePermission(requestName, Permission.Action.ADMIN); 530 } catch (IOException ioe) { 531 throw new ServiceException(ioe); 532 } 533 } 534 535 private boolean isClientCellBlockSupport(RpcCallContext context) { 536 return context != null && context.isClientCellBlockSupported(); 537 } 538 539 private void addResult(final MutateResponse.Builder builder, final Result result, 540 final HBaseRpcController rpcc, boolean clientCellBlockSupported) { 541 if (result == null) return; 542 if (clientCellBlockSupported) { 543 builder.setResult(ProtobufUtil.toResultNoData(result)); 544 rpcc.setCellScanner(result.cellScanner()); 545 } else { 546 ClientProtos.Result pbr = ProtobufUtil.toResult(result); 547 builder.setResult(pbr); 548 } 549 } 550 551 private void addResults(ScanResponse.Builder builder, List<Result> results, 552 HBaseRpcController controller, boolean isDefaultRegion, boolean clientCellBlockSupported) { 553 builder.setStale(!isDefaultRegion); 554 if (results.isEmpty()) { 555 return; 556 } 557 if (clientCellBlockSupported) { 558 for (Result res : results) { 559 builder.addCellsPerResult(res.size()); 560 builder.addPartialFlagPerResult(res.mayHaveMoreCellsInRow()); 561 } 562 controller.setCellScanner(CellUtil.createCellScanner(results)); 563 } else { 564 for (Result res : results) { 565 ClientProtos.Result pbr = ProtobufUtil.toResult(res); 566 builder.addResults(pbr); 567 } 568 } 569 } 570 571 private CheckAndMutateResult checkAndMutate(HRegion region, List<ClientProtos.Action> actions, 572 CellScanner cellScanner, Condition condition, long nonceGroup, 573 ActivePolicyEnforcement spaceQuotaEnforcement) throws IOException { 574 int countOfCompleteMutation = 0; 575 try { 576 if (!region.getRegionInfo().isMetaRegion()) { 577 server.getMemStoreFlusher().reclaimMemStoreMemory(); 578 } 579 List<Mutation> mutations = new ArrayList<>(); 580 long nonce = HConstants.NO_NONCE; 581 for (ClientProtos.Action action : actions) { 582 if (action.hasGet()) { 583 throw new DoNotRetryIOException( 584 "Atomic put and/or delete only, not a Get=" + action.getGet()); 585 } 586 MutationProto mutation = action.getMutation(); 587 MutationType type = mutation.getMutateType(); 588 switch (type) { 589 case PUT: 590 Put put = ProtobufUtil.toPut(mutation, cellScanner); 591 ++countOfCompleteMutation; 592 checkCellSizeLimit(region, put); 593 spaceQuotaEnforcement.getPolicyEnforcement(region).check(put); 594 mutations.add(put); 595 break; 596 case DELETE: 597 Delete del = ProtobufUtil.toDelete(mutation, cellScanner); 598 ++countOfCompleteMutation; 599 spaceQuotaEnforcement.getPolicyEnforcement(region).check(del); 600 mutations.add(del); 601 break; 602 case INCREMENT: 603 Increment increment = ProtobufUtil.toIncrement(mutation, cellScanner); 604 nonce = mutation.hasNonce() ? mutation.getNonce() : HConstants.NO_NONCE; 605 ++countOfCompleteMutation; 606 checkCellSizeLimit(region, increment); 607 spaceQuotaEnforcement.getPolicyEnforcement(region).check(increment); 608 mutations.add(increment); 609 break; 610 case APPEND: 611 Append append = ProtobufUtil.toAppend(mutation, cellScanner); 612 nonce = mutation.hasNonce() ? mutation.getNonce() : HConstants.NO_NONCE; 613 ++countOfCompleteMutation; 614 checkCellSizeLimit(region, append); 615 spaceQuotaEnforcement.getPolicyEnforcement(region).check(append); 616 mutations.add(append); 617 break; 618 default: 619 throw new DoNotRetryIOException("invalid mutation type : " + type); 620 } 621 } 622 623 if (mutations.size() == 0) { 624 return new CheckAndMutateResult(true, null); 625 } else { 626 CheckAndMutate checkAndMutate = ProtobufUtil.toCheckAndMutate(condition, mutations); 627 CheckAndMutateResult result = null; 628 if (region.getCoprocessorHost() != null) { 629 result = region.getCoprocessorHost().preCheckAndMutate(checkAndMutate); 630 } 631 if (result == null) { 632 result = region.checkAndMutate(checkAndMutate, nonceGroup, nonce); 633 if (region.getCoprocessorHost() != null) { 634 result = region.getCoprocessorHost().postCheckAndMutate(checkAndMutate, result); 635 } 636 } 637 return result; 638 } 639 } finally { 640 // Currently, the checkAndMutate isn't supported by batch so it won't mess up the cell scanner 641 // even if the malformed cells are not skipped. 642 for (int i = countOfCompleteMutation; i < actions.size(); ++i) { 643 skipCellsForMutation(actions.get(i), cellScanner); 644 } 645 } 646 } 647 648 /** 649 * Execute an append mutation. 650 * @return result to return to client if default operation should be bypassed as indicated by 651 * RegionObserver, null otherwise 652 */ 653 private Result append(final HRegion region, final OperationQuota quota, 654 final MutationProto mutation, final CellScanner cellScanner, long nonceGroup, 655 ActivePolicyEnforcement spaceQuota) throws IOException { 656 long before = EnvironmentEdgeManager.currentTime(); 657 Append append = ProtobufUtil.toAppend(mutation, cellScanner); 658 checkCellSizeLimit(region, append); 659 spaceQuota.getPolicyEnforcement(region).check(append); 660 quota.addMutation(append); 661 long nonce = mutation.hasNonce() ? mutation.getNonce() : HConstants.NO_NONCE; 662 Result r = region.append(append, nonceGroup, nonce); 663 if (server.getMetrics() != null) { 664 server.getMetrics().updateAppend(region.getTableDescriptor().getTableName(), 665 EnvironmentEdgeManager.currentTime() - before); 666 } 667 return r == null ? Result.EMPTY_RESULT : r; 668 } 669 670 /** 671 * Execute an increment mutation. 672 */ 673 private Result increment(final HRegion region, final OperationQuota quota, 674 final MutationProto mutation, final CellScanner cells, long nonceGroup, 675 ActivePolicyEnforcement spaceQuota) throws IOException { 676 long before = EnvironmentEdgeManager.currentTime(); 677 Increment increment = ProtobufUtil.toIncrement(mutation, cells); 678 checkCellSizeLimit(region, increment); 679 spaceQuota.getPolicyEnforcement(region).check(increment); 680 quota.addMutation(increment); 681 long nonce = mutation.hasNonce() ? mutation.getNonce() : HConstants.NO_NONCE; 682 Result r = region.increment(increment, nonceGroup, nonce); 683 final MetricsRegionServer metricsRegionServer = server.getMetrics(); 684 if (metricsRegionServer != null) { 685 metricsRegionServer.updateIncrement(region.getTableDescriptor().getTableName(), 686 EnvironmentEdgeManager.currentTime() - before); 687 } 688 return r == null ? Result.EMPTY_RESULT : r; 689 } 690 691 /** 692 * Run through the regionMutation <code>rm</code> and per Mutation, do the work, and then when 693 * done, add an instance of a {@link ResultOrException} that corresponds to each Mutation. 694 * @param cellsToReturn Could be null. May be allocated in this method. This is what this method 695 * returns as a 'result'. 696 * @param closeCallBack the callback to be used with multigets 697 * @param context the current RpcCallContext 698 * @return Return the <code>cellScanner</code> passed 699 */ 700 private List<CellScannable> doNonAtomicRegionMutation(final HRegion region, 701 final OperationQuota quota, final RegionAction actions, final CellScanner cellScanner, 702 final RegionActionResult.Builder builder, List<CellScannable> cellsToReturn, long nonceGroup, 703 final RegionScannersCloseCallBack closeCallBack, RpcCallContext context, 704 ActivePolicyEnforcement spaceQuotaEnforcement) { 705 // Gather up CONTIGUOUS Puts and Deletes in this mutations List. Idea is that rather than do 706 // one at a time, we instead pass them in batch. Be aware that the corresponding 707 // ResultOrException instance that matches each Put or Delete is then added down in the 708 // doNonAtomicBatchOp call. We should be staying aligned though the Put and Delete are 709 // deferred/batched 710 List<ClientProtos.Action> mutations = null; 711 long maxQuotaResultSize = Math.min(maxScannerResultSize, quota.getReadAvailable()); 712 IOException sizeIOE = null; 713 Object lastBlock = null; 714 ClientProtos.ResultOrException.Builder resultOrExceptionBuilder = 715 ResultOrException.newBuilder(); 716 boolean hasResultOrException = false; 717 for (ClientProtos.Action action : actions.getActionList()) { 718 hasResultOrException = false; 719 resultOrExceptionBuilder.clear(); 720 try { 721 Result r = null; 722 723 if ( 724 context != null && context.isRetryImmediatelySupported() 725 && (context.getResponseCellSize() > maxQuotaResultSize 726 || context.getResponseBlockSize() + context.getResponseExceptionSize() 727 > maxQuotaResultSize) 728 ) { 729 730 // We're storing the exception since the exception and reason string won't 731 // change after the response size limit is reached. 732 if (sizeIOE == null) { 733 // We don't need the stack un-winding do don't throw the exception. 734 // Throwing will kill the JVM's JIT. 735 // 736 // Instead just create the exception and then store it. 737 sizeIOE = new MultiActionResultTooLarge("Max size exceeded" + " CellSize: " 738 + context.getResponseCellSize() + " BlockSize: " + context.getResponseBlockSize()); 739 740 // Only report the exception once since there's only one request that 741 // caused the exception. Otherwise this number will dominate the exceptions count. 742 rpcServer.getMetrics().exception(sizeIOE); 743 } 744 745 // Now that there's an exception is known to be created 746 // use it for the response. 747 // 748 // This will create a copy in the builder. 749 NameBytesPair pair = ResponseConverter.buildException(sizeIOE); 750 resultOrExceptionBuilder.setException(pair); 751 context.incrementResponseExceptionSize(pair.getSerializedSize()); 752 resultOrExceptionBuilder.setIndex(action.getIndex()); 753 builder.addResultOrException(resultOrExceptionBuilder.build()); 754 skipCellsForMutation(action, cellScanner); 755 continue; 756 } 757 if (action.hasGet()) { 758 long before = EnvironmentEdgeManager.currentTime(); 759 ClientProtos.Get pbGet = action.getGet(); 760 // An asynchbase client, https://github.com/OpenTSDB/asynchbase, starts by trying to do 761 // a get closest before. Throwing the UnknownProtocolException signals it that it needs 762 // to switch and do hbase2 protocol (HBase servers do not tell clients what versions 763 // they are; its a problem for non-native clients like asynchbase. HBASE-20225. 764 if (pbGet.hasClosestRowBefore() && pbGet.getClosestRowBefore()) { 765 throw new UnknownProtocolException("Is this a pre-hbase-1.0.0 or asynchbase client? " 766 + "Client is invoking getClosestRowBefore removed in hbase-2.0.0 replaced by " 767 + "reverse Scan."); 768 } 769 try { 770 Get get = ProtobufUtil.toGet(pbGet); 771 if (context != null) { 772 r = get(get, (region), closeCallBack, context); 773 } else { 774 r = region.get(get); 775 } 776 } finally { 777 final MetricsRegionServer metricsRegionServer = server.getMetrics(); 778 if (metricsRegionServer != null) { 779 metricsRegionServer.updateGet(region.getTableDescriptor().getTableName(), 780 EnvironmentEdgeManager.currentTime() - before); 781 } 782 } 783 } else if (action.hasServiceCall()) { 784 hasResultOrException = true; 785 Message result = execServiceOnRegion(region, action.getServiceCall()); 786 ClientProtos.CoprocessorServiceResult.Builder serviceResultBuilder = 787 ClientProtos.CoprocessorServiceResult.newBuilder(); 788 resultOrExceptionBuilder.setServiceResult(serviceResultBuilder 789 .setValue(serviceResultBuilder.getValueBuilder().setName(result.getClass().getName()) 790 // TODO: Copy!!! 791 .setValue(UnsafeByteOperations.unsafeWrap(result.toByteArray())))); 792 } else if (action.hasMutation()) { 793 MutationType type = action.getMutation().getMutateType(); 794 if ( 795 type != MutationType.PUT && type != MutationType.DELETE && mutations != null 796 && !mutations.isEmpty() 797 ) { 798 // Flush out any Puts or Deletes already collected. 799 doNonAtomicBatchOp(builder, region, quota, mutations, cellScanner, 800 spaceQuotaEnforcement); 801 mutations.clear(); 802 } 803 switch (type) { 804 case APPEND: 805 r = append(region, quota, action.getMutation(), cellScanner, nonceGroup, 806 spaceQuotaEnforcement); 807 break; 808 case INCREMENT: 809 r = increment(region, quota, action.getMutation(), cellScanner, nonceGroup, 810 spaceQuotaEnforcement); 811 break; 812 case PUT: 813 case DELETE: 814 // Collect the individual mutations and apply in a batch 815 if (mutations == null) { 816 mutations = new ArrayList<>(actions.getActionCount()); 817 } 818 mutations.add(action); 819 break; 820 default: 821 throw new DoNotRetryIOException("Unsupported mutate type: " + type.name()); 822 } 823 } else { 824 throw new HBaseIOException("Unexpected Action type"); 825 } 826 if (r != null) { 827 ClientProtos.Result pbResult = null; 828 if (isClientCellBlockSupport(context)) { 829 pbResult = ProtobufUtil.toResultNoData(r); 830 // Hard to guess the size here. Just make a rough guess. 831 if (cellsToReturn == null) { 832 cellsToReturn = new ArrayList<>(); 833 } 834 cellsToReturn.add(r); 835 } else { 836 pbResult = ProtobufUtil.toResult(r); 837 } 838 lastBlock = addSize(context, r, lastBlock); 839 hasResultOrException = true; 840 resultOrExceptionBuilder.setResult(pbResult); 841 } 842 // Could get to here and there was no result and no exception. Presumes we added 843 // a Put or Delete to the collecting Mutations List for adding later. In this 844 // case the corresponding ResultOrException instance for the Put or Delete will be added 845 // down in the doNonAtomicBatchOp method call rather than up here. 846 } catch (IOException ie) { 847 rpcServer.getMetrics().exception(ie); 848 hasResultOrException = true; 849 NameBytesPair pair = ResponseConverter.buildException(ie); 850 resultOrExceptionBuilder.setException(pair); 851 context.incrementResponseExceptionSize(pair.getSerializedSize()); 852 } 853 if (hasResultOrException) { 854 // Propagate index. 855 resultOrExceptionBuilder.setIndex(action.getIndex()); 856 builder.addResultOrException(resultOrExceptionBuilder.build()); 857 } 858 } 859 // Finish up any outstanding mutations 860 if (!CollectionUtils.isEmpty(mutations)) { 861 doNonAtomicBatchOp(builder, region, quota, mutations, cellScanner, spaceQuotaEnforcement); 862 } 863 return cellsToReturn; 864 } 865 866 private void checkCellSizeLimit(final HRegion r, final Mutation m) throws IOException { 867 if (r.maxCellSize > 0) { 868 CellScanner cells = m.cellScanner(); 869 while (cells.advance()) { 870 int size = PrivateCellUtil.estimatedSerializedSizeOf(cells.current()); 871 if (size > r.maxCellSize) { 872 String msg = "Cell[" + cells.current() + "] with size " + size + " exceeds limit of " 873 + r.maxCellSize + " bytes"; 874 LOG.debug(msg); 875 throw new DoNotRetryIOException(msg); 876 } 877 } 878 } 879 } 880 881 private void doAtomicBatchOp(final RegionActionResult.Builder builder, final HRegion region, 882 final OperationQuota quota, final List<ClientProtos.Action> mutations, final CellScanner cells, 883 long nonceGroup, ActivePolicyEnforcement spaceQuotaEnforcement) throws IOException { 884 // Just throw the exception. The exception will be caught and then added to region-level 885 // exception for RegionAction. Leaving the null to action result is ok since the null 886 // result is viewed as failure by hbase client. And the region-lever exception will be used 887 // to replaced the null result. see AsyncRequestFutureImpl#receiveMultiAction and 888 // AsyncBatchRpcRetryingCaller#onComplete for more details. 889 doBatchOp(builder, region, quota, mutations, cells, nonceGroup, spaceQuotaEnforcement, true); 890 } 891 892 private void doNonAtomicBatchOp(final RegionActionResult.Builder builder, final HRegion region, 893 final OperationQuota quota, final List<ClientProtos.Action> mutations, final CellScanner cells, 894 ActivePolicyEnforcement spaceQuotaEnforcement) { 895 try { 896 doBatchOp(builder, region, quota, mutations, cells, HConstants.NO_NONCE, 897 spaceQuotaEnforcement, false); 898 } catch (IOException e) { 899 // Set the exception for each action. The mutations in same RegionAction are group to 900 // different batch and then be processed individually. Hence, we don't set the region-level 901 // exception here for whole RegionAction. 902 for (Action mutation : mutations) { 903 builder.addResultOrException(getResultOrException(e, mutation.getIndex())); 904 } 905 } 906 } 907 908 /** 909 * Execute a list of mutations. nnn 910 */ 911 private void doBatchOp(final RegionActionResult.Builder builder, final HRegion region, 912 final OperationQuota quota, final List<ClientProtos.Action> mutations, final CellScanner cells, 913 long nonceGroup, ActivePolicyEnforcement spaceQuotaEnforcement, boolean atomic) 914 throws IOException { 915 Mutation[] mArray = new Mutation[mutations.size()]; 916 long before = EnvironmentEdgeManager.currentTime(); 917 boolean batchContainsPuts = false, batchContainsDelete = false; 918 try { 919 /** 920 * HBASE-17924 mutationActionMap is a map to map the relation between mutations and actions 921 * since mutation array may have been reoredered.In order to return the right result or 922 * exception to the corresponding actions, We need to know which action is the mutation belong 923 * to. We can't sort ClientProtos.Action array, since they are bonded to cellscanners. 924 */ 925 Map<Mutation, ClientProtos.Action> mutationActionMap = new HashMap<>(); 926 int i = 0; 927 long nonce = HConstants.NO_NONCE; 928 for (ClientProtos.Action action : mutations) { 929 if (action.hasGet()) { 930 throw new DoNotRetryIOException( 931 "Atomic put and/or delete only, not a Get=" + action.getGet()); 932 } 933 MutationProto m = action.getMutation(); 934 Mutation mutation; 935 switch (m.getMutateType()) { 936 case PUT: 937 mutation = ProtobufUtil.toPut(m, cells); 938 batchContainsPuts = true; 939 break; 940 941 case DELETE: 942 mutation = ProtobufUtil.toDelete(m, cells); 943 batchContainsDelete = true; 944 break; 945 946 case INCREMENT: 947 mutation = ProtobufUtil.toIncrement(m, cells); 948 nonce = m.hasNonce() ? m.getNonce() : HConstants.NO_NONCE; 949 break; 950 951 case APPEND: 952 mutation = ProtobufUtil.toAppend(m, cells); 953 nonce = m.hasNonce() ? m.getNonce() : HConstants.NO_NONCE; 954 break; 955 956 default: 957 throw new DoNotRetryIOException("Invalid mutation type : " + m.getMutateType()); 958 } 959 mutationActionMap.put(mutation, action); 960 mArray[i++] = mutation; 961 checkCellSizeLimit(region, mutation); 962 // Check if a space quota disallows this mutation 963 spaceQuotaEnforcement.getPolicyEnforcement(region).check(mutation); 964 quota.addMutation(mutation); 965 } 966 967 if (!region.getRegionInfo().isMetaRegion()) { 968 server.getMemStoreFlusher().reclaimMemStoreMemory(); 969 } 970 971 // HBASE-17924 972 // Sort to improve lock efficiency for non-atomic batch of operations. If atomic 973 // order is preserved as its expected from the client 974 if (!atomic) { 975 Arrays.sort(mArray, (v1, v2) -> Row.COMPARATOR.compare(v1, v2)); 976 } 977 978 OperationStatus[] codes = region.batchMutate(mArray, atomic, nonceGroup, nonce); 979 980 // When atomic is true, it indicates that the mutateRow API or the batch API with 981 // RowMutations is called. In this case, we need to merge the results of the 982 // Increment/Append operations if the mutations include those operations, and set the merged 983 // result to the first element of the ResultOrException list 984 if (atomic) { 985 List<ResultOrException> resultOrExceptions = new ArrayList<>(); 986 List<Result> results = new ArrayList<>(); 987 for (i = 0; i < codes.length; i++) { 988 if (codes[i].getResult() != null) { 989 results.add(codes[i].getResult()); 990 } 991 if (i != 0) { 992 resultOrExceptions 993 .add(getResultOrException(ClientProtos.Result.getDefaultInstance(), i)); 994 } 995 } 996 997 if (results.isEmpty()) { 998 builder.addResultOrException( 999 getResultOrException(ClientProtos.Result.getDefaultInstance(), 0)); 1000 } else { 1001 // Merge the results of the Increment/Append operations 1002 List<Cell> cellList = new ArrayList<>(); 1003 for (Result result : results) { 1004 if (result.rawCells() != null) { 1005 cellList.addAll(Arrays.asList(result.rawCells())); 1006 } 1007 } 1008 Result result = Result.create(cellList); 1009 1010 // Set the merged result of the Increment/Append operations to the first element of the 1011 // ResultOrException list 1012 builder.addResultOrException(getResultOrException(ProtobufUtil.toResult(result), 0)); 1013 } 1014 1015 builder.addAllResultOrException(resultOrExceptions); 1016 return; 1017 } 1018 1019 for (i = 0; i < codes.length; i++) { 1020 Mutation currentMutation = mArray[i]; 1021 ClientProtos.Action currentAction = mutationActionMap.get(currentMutation); 1022 int index = currentAction.hasIndex() ? currentAction.getIndex() : i; 1023 Exception e; 1024 switch (codes[i].getOperationStatusCode()) { 1025 case BAD_FAMILY: 1026 e = new NoSuchColumnFamilyException(codes[i].getExceptionMsg()); 1027 builder.addResultOrException(getResultOrException(e, index)); 1028 break; 1029 1030 case SANITY_CHECK_FAILURE: 1031 e = new FailedSanityCheckException(codes[i].getExceptionMsg()); 1032 builder.addResultOrException(getResultOrException(e, index)); 1033 break; 1034 1035 default: 1036 e = new DoNotRetryIOException(codes[i].getExceptionMsg()); 1037 builder.addResultOrException(getResultOrException(e, index)); 1038 break; 1039 1040 case SUCCESS: 1041 builder.addResultOrException( 1042 getResultOrException(ClientProtos.Result.getDefaultInstance(), index)); 1043 break; 1044 1045 case STORE_TOO_BUSY: 1046 e = new RegionTooBusyException(codes[i].getExceptionMsg()); 1047 builder.addResultOrException(getResultOrException(e, index)); 1048 break; 1049 } 1050 } 1051 } finally { 1052 int processedMutationIndex = 0; 1053 for (Action mutation : mutations) { 1054 // The non-null mArray[i] means the cell scanner has been read. 1055 if (mArray[processedMutationIndex++] == null) { 1056 skipCellsForMutation(mutation, cells); 1057 } 1058 } 1059 updateMutationMetrics(region, before, batchContainsPuts, batchContainsDelete); 1060 } 1061 } 1062 1063 private void updateMutationMetrics(HRegion region, long starttime, boolean batchContainsPuts, 1064 boolean batchContainsDelete) { 1065 final MetricsRegionServer metricsRegionServer = server.getMetrics(); 1066 if (metricsRegionServer != null) { 1067 long after = EnvironmentEdgeManager.currentTime(); 1068 if (batchContainsPuts) { 1069 metricsRegionServer.updatePutBatch(region.getTableDescriptor().getTableName(), 1070 after - starttime); 1071 } 1072 if (batchContainsDelete) { 1073 metricsRegionServer.updateDeleteBatch(region.getTableDescriptor().getTableName(), 1074 after - starttime); 1075 } 1076 } 1077 } 1078 1079 /** 1080 * Execute a list of Put/Delete mutations. The function returns OperationStatus instead of 1081 * constructing MultiResponse to save a possible loop if caller doesn't need MultiResponse. 1082 * @return an array of OperationStatus which internally contains the OperationStatusCode and the 1083 * exceptionMessage if any 1084 * @deprecated Since 3.0.0, will be removed in 4.0.0. We do not use this method for replaying 1085 * edits for secondary replicas any more, see 1086 * {@link #replicateToReplica(RpcController, ReplicateWALEntryRequest)}. 1087 */ 1088 @Deprecated 1089 private OperationStatus[] doReplayBatchOp(final HRegion region, 1090 final List<MutationReplay> mutations, long replaySeqId) throws IOException { 1091 long before = EnvironmentEdgeManager.currentTime(); 1092 boolean batchContainsPuts = false, batchContainsDelete = false; 1093 try { 1094 for (Iterator<MutationReplay> it = mutations.iterator(); it.hasNext();) { 1095 MutationReplay m = it.next(); 1096 1097 if (m.getType() == MutationType.PUT) { 1098 batchContainsPuts = true; 1099 } else { 1100 batchContainsDelete = true; 1101 } 1102 1103 NavigableMap<byte[], List<Cell>> map = m.mutation.getFamilyCellMap(); 1104 List<Cell> metaCells = map.get(WALEdit.METAFAMILY); 1105 if (metaCells != null && !metaCells.isEmpty()) { 1106 for (Cell metaCell : metaCells) { 1107 CompactionDescriptor compactionDesc = WALEdit.getCompaction(metaCell); 1108 boolean isDefaultReplica = RegionReplicaUtil.isDefaultReplica(region.getRegionInfo()); 1109 HRegion hRegion = region; 1110 if (compactionDesc != null) { 1111 // replay the compaction. Remove the files from stores only if we are the primary 1112 // region replica (thus own the files) 1113 hRegion.replayWALCompactionMarker(compactionDesc, !isDefaultReplica, isDefaultReplica, 1114 replaySeqId); 1115 continue; 1116 } 1117 FlushDescriptor flushDesc = WALEdit.getFlushDescriptor(metaCell); 1118 if (flushDesc != null && !isDefaultReplica) { 1119 hRegion.replayWALFlushMarker(flushDesc, replaySeqId); 1120 continue; 1121 } 1122 RegionEventDescriptor regionEvent = WALEdit.getRegionEventDescriptor(metaCell); 1123 if (regionEvent != null && !isDefaultReplica) { 1124 hRegion.replayWALRegionEventMarker(regionEvent); 1125 continue; 1126 } 1127 BulkLoadDescriptor bulkLoadEvent = WALEdit.getBulkLoadDescriptor(metaCell); 1128 if (bulkLoadEvent != null) { 1129 hRegion.replayWALBulkLoadEventMarker(bulkLoadEvent); 1130 continue; 1131 } 1132 } 1133 it.remove(); 1134 } 1135 } 1136 requestCount.increment(); 1137 if (!region.getRegionInfo().isMetaRegion()) { 1138 server.getMemStoreFlusher().reclaimMemStoreMemory(); 1139 } 1140 return region.batchReplay(mutations.toArray(new MutationReplay[mutations.size()]), 1141 replaySeqId); 1142 } finally { 1143 updateMutationMetrics(region, before, batchContainsPuts, batchContainsDelete); 1144 } 1145 } 1146 1147 private void closeAllScanners() { 1148 // Close any outstanding scanners. Means they'll get an UnknownScanner 1149 // exception next time they come in. 1150 for (Map.Entry<String, RegionScannerHolder> e : scanners.entrySet()) { 1151 try { 1152 e.getValue().s.close(); 1153 } catch (IOException ioe) { 1154 LOG.warn("Closing scanner " + e.getKey(), ioe); 1155 } 1156 } 1157 } 1158 1159 // Directly invoked only for testing 1160 public RSRpcServices(final HRegionServer rs) throws IOException { 1161 super(rs, rs.getProcessName()); 1162 final Configuration conf = rs.getConfiguration(); 1163 rowSizeWarnThreshold = 1164 conf.getInt(HConstants.BATCH_ROWS_THRESHOLD_NAME, HConstants.BATCH_ROWS_THRESHOLD_DEFAULT); 1165 rejectRowsWithSizeOverThreshold = 1166 conf.getBoolean(REJECT_BATCH_ROWS_OVER_THRESHOLD, DEFAULT_REJECT_BATCH_ROWS_OVER_THRESHOLD); 1167 scannerLeaseTimeoutPeriod = conf.getInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, 1168 HConstants.DEFAULT_HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD); 1169 maxScannerResultSize = conf.getLong(HConstants.HBASE_SERVER_SCANNER_MAX_RESULT_SIZE_KEY, 1170 HConstants.DEFAULT_HBASE_SERVER_SCANNER_MAX_RESULT_SIZE); 1171 rpcTimeout = 1172 conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY, HConstants.DEFAULT_HBASE_RPC_TIMEOUT); 1173 minimumScanTimeLimitDelta = conf.getLong(REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA, 1174 DEFAULT_REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA); 1175 rpcServer.setNamedQueueRecorder(rs.getNamedQueueRecorder()); 1176 closedScanners = CacheBuilder.newBuilder() 1177 .expireAfterAccess(scannerLeaseTimeoutPeriod, TimeUnit.MILLISECONDS).build(); 1178 } 1179 1180 @Override 1181 protected boolean defaultReservoirEnabled() { 1182 return true; 1183 } 1184 1185 @Override 1186 protected ServerType getDNSServerType() { 1187 return DNS.ServerType.REGIONSERVER; 1188 } 1189 1190 @Override 1191 protected String getHostname(Configuration conf, String defaultHostname) { 1192 return conf.get("hbase.regionserver.ipc.address", defaultHostname); 1193 } 1194 1195 @Override 1196 protected String getPortConfigName() { 1197 return HConstants.REGIONSERVER_PORT; 1198 } 1199 1200 @Override 1201 protected int getDefaultPort() { 1202 return HConstants.DEFAULT_REGIONSERVER_PORT; 1203 } 1204 1205 @Override 1206 protected Class<?> getRpcSchedulerFactoryClass(Configuration conf) { 1207 return conf.getClass(REGION_SERVER_RPC_SCHEDULER_FACTORY_CLASS, 1208 SimpleRpcSchedulerFactory.class); 1209 } 1210 1211 protected RpcServerInterface createRpcServer(final Server server, 1212 final RpcSchedulerFactory rpcSchedulerFactory, final InetSocketAddress bindAddress, 1213 final String name) throws IOException { 1214 final Configuration conf = server.getConfiguration(); 1215 boolean reservoirEnabled = conf.getBoolean(ByteBuffAllocator.ALLOCATOR_POOL_ENABLED_KEY, true); 1216 try { 1217 return RpcServerFactory.createRpcServer(server, name, getServices(), bindAddress, // use final 1218 // bindAddress 1219 // for this 1220 // server. 1221 conf, rpcSchedulerFactory.create(conf, this, server), reservoirEnabled); 1222 } catch (BindException be) { 1223 throw new IOException(be.getMessage() + ". To switch ports use the '" 1224 + HConstants.REGIONSERVER_PORT + "' configuration property.", 1225 be.getCause() != null ? be.getCause() : be); 1226 } 1227 } 1228 1229 protected Class<?> getRpcSchedulerFactoryClass() { 1230 final Configuration conf = server.getConfiguration(); 1231 return conf.getClass(REGION_SERVER_RPC_SCHEDULER_FACTORY_CLASS, 1232 SimpleRpcSchedulerFactory.class); 1233 } 1234 1235 protected PriorityFunction createPriority() { 1236 return new RSAnnotationReadingPriorityFunction(this); 1237 } 1238 1239 public int getScannersCount() { 1240 return scanners.size(); 1241 } 1242 1243 /** 1244 * @return The outstanding RegionScanner for <code>scannerId</code> or null if none found. 1245 */ 1246 RegionScanner getScanner(long scannerId) { 1247 RegionScannerHolder rsh = getRegionScannerHolder(scannerId); 1248 return rsh == null ? null : rsh.s; 1249 } 1250 1251 /** 1252 * @return The associated RegionScannerHolder for <code>scannerId</code> or null. 1253 */ 1254 private RegionScannerHolder getRegionScannerHolder(long scannerId) { 1255 return scanners.get(toScannerName(scannerId)); 1256 } 1257 1258 public String getScanDetailsWithId(long scannerId) { 1259 RegionScanner scanner = getScanner(scannerId); 1260 if (scanner == null) { 1261 return null; 1262 } 1263 StringBuilder builder = new StringBuilder(); 1264 builder.append("table: ").append(scanner.getRegionInfo().getTable().getNameAsString()); 1265 builder.append(" region: ").append(scanner.getRegionInfo().getRegionNameAsString()); 1266 builder.append(" operation_id: ").append(scanner.getOperationId()); 1267 return builder.toString(); 1268 } 1269 1270 public String getScanDetailsWithRequest(ScanRequest request) { 1271 try { 1272 if (!request.hasRegion()) { 1273 return null; 1274 } 1275 Region region = getRegion(request.getRegion()); 1276 StringBuilder builder = new StringBuilder(); 1277 builder.append("table: ").append(region.getRegionInfo().getTable().getNameAsString()); 1278 builder.append(" region: ").append(region.getRegionInfo().getRegionNameAsString()); 1279 for (NameBytesPair pair : request.getScan().getAttributeList()) { 1280 if (OperationWithAttributes.ID_ATRIBUTE.equals(pair.getName())) { 1281 builder.append(" operation_id: ").append(Bytes.toString(pair.getValue().toByteArray())); 1282 break; 1283 } 1284 } 1285 return builder.toString(); 1286 } catch (IOException ignored) { 1287 return null; 1288 } 1289 } 1290 1291 /** 1292 * Get the vtime associated with the scanner. Currently the vtime is the number of "next" calls. 1293 */ 1294 long getScannerVirtualTime(long scannerId) { 1295 RegionScannerHolder rsh = getRegionScannerHolder(scannerId); 1296 return rsh == null ? 0L : rsh.getNextCallSeq(); 1297 } 1298 1299 /** 1300 * Method to account for the size of retained cells and retained data blocks. 1301 * @param context rpc call context 1302 * @param r result to add size. 1303 * @param lastBlock last block to check whether we need to add the block size in context. 1304 * @return an object that represents the last referenced block from this response. 1305 */ 1306 Object addSize(RpcCallContext context, Result r, Object lastBlock) { 1307 if (context != null && r != null && !r.isEmpty()) { 1308 for (Cell c : r.rawCells()) { 1309 context.incrementResponseCellSize(PrivateCellUtil.estimatedSerializedSizeOf(c)); 1310 1311 // Since byte buffers can point all kinds of crazy places it's harder to keep track 1312 // of which blocks are kept alive by what byte buffer. 1313 // So we make a guess. 1314 if (c instanceof ByteBufferExtendedCell) { 1315 ByteBufferExtendedCell bbCell = (ByteBufferExtendedCell) c; 1316 ByteBuffer bb = bbCell.getValueByteBuffer(); 1317 if (bb != lastBlock) { 1318 context.incrementResponseBlockSize(bb.capacity()); 1319 lastBlock = bb; 1320 } 1321 } else { 1322 // We're using the last block being the same as the current block as 1323 // a proxy for pointing to a new block. This won't be exact. 1324 // If there are multiple gets that bounce back and forth 1325 // Then it's possible that this will over count the size of 1326 // referenced blocks. However it's better to over count and 1327 // use two rpcs than to OOME the regionserver. 1328 byte[] valueArray = c.getValueArray(); 1329 if (valueArray != lastBlock) { 1330 context.incrementResponseBlockSize(valueArray.length); 1331 lastBlock = valueArray; 1332 } 1333 } 1334 1335 } 1336 } 1337 return lastBlock; 1338 } 1339 1340 /** 1341 * @return Remote client's ip and port else null if can't be determined. 1342 */ 1343 @RestrictedApi(explanation = "Should only be called in TestRSRpcServices and RSRpcServices", 1344 link = "", allowedOnPath = ".*(TestRSRpcServices|RSRpcServices).java") 1345 static String getRemoteClientIpAndPort() { 1346 RpcCall rpcCall = RpcServer.getCurrentCall().orElse(null); 1347 if (rpcCall == null) { 1348 return HConstants.EMPTY_STRING; 1349 } 1350 InetAddress address = rpcCall.getRemoteAddress(); 1351 if (address == null) { 1352 return HConstants.EMPTY_STRING; 1353 } 1354 // Be careful here with InetAddress. Do InetAddress#getHostAddress. It will not do a name 1355 // resolution. Just use the IP. It is generally a smaller amount of info to keep around while 1356 // scanning than a hostname anyways. 1357 return Address.fromParts(address.getHostAddress(), rpcCall.getRemotePort()).toString(); 1358 } 1359 1360 /** 1361 * @return Remote client's username. 1362 */ 1363 @RestrictedApi(explanation = "Should only be called in TestRSRpcServices and RSRpcServices", 1364 link = "", allowedOnPath = ".*(TestRSRpcServices|RSRpcServices).java") 1365 static String getUserName() { 1366 RpcCall rpcCall = RpcServer.getCurrentCall().orElse(null); 1367 if (rpcCall == null) { 1368 return HConstants.EMPTY_STRING; 1369 } 1370 return rpcCall.getRequestUserName().orElse(HConstants.EMPTY_STRING); 1371 } 1372 1373 private RegionScannerHolder addScanner(String scannerName, RegionScanner s, Shipper shipper, 1374 HRegion r, boolean needCursor, boolean fullRegionScan) throws LeaseStillHeldException { 1375 Lease lease = server.getLeaseManager().createLease(scannerName, this.scannerLeaseTimeoutPeriod, 1376 new ScannerListener(scannerName)); 1377 RpcCallback shippedCallback = new RegionScannerShippedCallBack(scannerName, shipper, lease); 1378 RpcCallback closeCallback = 1379 s instanceof RpcCallback ? (RpcCallback) s : new RegionScannerCloseCallBack(s); 1380 RegionScannerHolder rsh = new RegionScannerHolder(s, r, closeCallback, shippedCallback, 1381 needCursor, fullRegionScan, getRemoteClientIpAndPort(), getUserName()); 1382 RegionScannerHolder existing = scanners.putIfAbsent(scannerName, rsh); 1383 assert existing == null : "scannerId must be unique within regionserver's whole lifecycle! " 1384 + scannerName + ", " + existing; 1385 return rsh; 1386 } 1387 1388 private boolean isFullRegionScan(Scan scan, HRegion region) { 1389 // If the scan start row equals or less than the start key of the region 1390 // and stop row greater than equals end key (if stop row present) 1391 // or if the stop row is empty 1392 // account this as a full region scan 1393 if ( 1394 Bytes.compareTo(scan.getStartRow(), region.getRegionInfo().getStartKey()) <= 0 1395 && (Bytes.compareTo(scan.getStopRow(), region.getRegionInfo().getEndKey()) >= 0 1396 && !Bytes.equals(region.getRegionInfo().getEndKey(), HConstants.EMPTY_END_ROW) 1397 || Bytes.equals(scan.getStopRow(), HConstants.EMPTY_END_ROW)) 1398 ) { 1399 return true; 1400 } 1401 return false; 1402 } 1403 1404 /** 1405 * Find the HRegion based on a region specifier 1406 * @param regionSpecifier the region specifier 1407 * @return the corresponding region 1408 * @throws IOException if the specifier is not null, but failed to find the region 1409 */ 1410 public HRegion getRegion(final RegionSpecifier regionSpecifier) throws IOException { 1411 return server.getRegion(regionSpecifier.getValue().toByteArray()); 1412 } 1413 1414 /** 1415 * Find the List of HRegions based on a list of region specifiers 1416 * @param regionSpecifiers the list of region specifiers 1417 * @return the corresponding list of regions 1418 * @throws IOException if any of the specifiers is not null, but failed to find the region 1419 */ 1420 private List<HRegion> getRegions(final List<RegionSpecifier> regionSpecifiers, 1421 final CacheEvictionStatsBuilder stats) { 1422 List<HRegion> regions = Lists.newArrayListWithCapacity(regionSpecifiers.size()); 1423 for (RegionSpecifier regionSpecifier : regionSpecifiers) { 1424 try { 1425 regions.add(server.getRegion(regionSpecifier.getValue().toByteArray())); 1426 } catch (NotServingRegionException e) { 1427 stats.addException(regionSpecifier.getValue().toByteArray(), e); 1428 } 1429 } 1430 return regions; 1431 } 1432 1433 PriorityFunction getPriority() { 1434 return priority; 1435 } 1436 1437 private RegionServerRpcQuotaManager getRpcQuotaManager() { 1438 return server.getRegionServerRpcQuotaManager(); 1439 } 1440 1441 private RegionServerSpaceQuotaManager getSpaceQuotaManager() { 1442 return server.getRegionServerSpaceQuotaManager(); 1443 } 1444 1445 void start(ZKWatcher zkWatcher) { 1446 this.scannerIdGenerator = new ScannerIdGenerator(this.server.getServerName()); 1447 internalStart(zkWatcher); 1448 } 1449 1450 void stop() { 1451 closeAllScanners(); 1452 internalStop(); 1453 } 1454 1455 /** 1456 * Called to verify that this server is up and running. 1457 */ 1458 // TODO : Rename this and HMaster#checkInitialized to isRunning() (or a better name). 1459 protected void checkOpen() throws IOException { 1460 if (server.isAborted()) { 1461 throw new RegionServerAbortedException("Server " + server.getServerName() + " aborting"); 1462 } 1463 if (server.isStopped()) { 1464 throw new RegionServerStoppedException("Server " + server.getServerName() + " stopping"); 1465 } 1466 if (!server.isDataFileSystemOk()) { 1467 throw new RegionServerStoppedException("File system not available"); 1468 } 1469 if (!server.isOnline()) { 1470 throw new ServerNotRunningYetException( 1471 "Server " + server.getServerName() + " is not running yet"); 1472 } 1473 } 1474 1475 /** 1476 * By default, put up an Admin and a Client Service. Set booleans 1477 * <code>hbase.regionserver.admin.executorService</code> and 1478 * <code>hbase.regionserver.client.executorService</code> if you want to enable/disable services. 1479 * Default is that both are enabled. 1480 * @return immutable list of blocking services and the security info classes that this server 1481 * supports 1482 */ 1483 protected List<BlockingServiceAndInterface> getServices() { 1484 boolean admin = getConfiguration().getBoolean(REGIONSERVER_ADMIN_SERVICE_CONFIG, true); 1485 boolean client = getConfiguration().getBoolean(REGIONSERVER_CLIENT_SERVICE_CONFIG, true); 1486 boolean clientMeta = 1487 getConfiguration().getBoolean(REGIONSERVER_CLIENT_META_SERVICE_CONFIG, true); 1488 List<BlockingServiceAndInterface> bssi = new ArrayList<>(); 1489 if (client) { 1490 bssi.add(new BlockingServiceAndInterface(ClientService.newReflectiveBlockingService(this), 1491 ClientService.BlockingInterface.class)); 1492 } 1493 if (admin) { 1494 bssi.add(new BlockingServiceAndInterface(AdminService.newReflectiveBlockingService(this), 1495 AdminService.BlockingInterface.class)); 1496 } 1497 if (clientMeta) { 1498 bssi.add(new BlockingServiceAndInterface(ClientMetaService.newReflectiveBlockingService(this), 1499 ClientMetaService.BlockingInterface.class)); 1500 } 1501 return new ImmutableList.Builder<BlockingServiceAndInterface>().addAll(bssi).build(); 1502 } 1503 1504 /** 1505 * Close a region on the region server. 1506 * @param controller the RPC controller 1507 * @param request the request n 1508 */ 1509 @Override 1510 @QosPriority(priority = HConstants.ADMIN_QOS) 1511 public CloseRegionResponse closeRegion(final RpcController controller, 1512 final CloseRegionRequest request) throws ServiceException { 1513 final ServerName sn = (request.hasDestinationServer() 1514 ? ProtobufUtil.toServerName(request.getDestinationServer()) 1515 : null); 1516 1517 try { 1518 checkOpen(); 1519 throwOnWrongStartCode(request); 1520 final String encodedRegionName = ProtobufUtil.getRegionEncodedName(request.getRegion()); 1521 1522 requestCount.increment(); 1523 if (sn == null) { 1524 LOG.info("Close " + encodedRegionName + " without moving"); 1525 } else { 1526 LOG.info("Close " + encodedRegionName + ", moving to " + sn); 1527 } 1528 boolean closed = server.closeRegion(encodedRegionName, false, sn); 1529 CloseRegionResponse.Builder builder = CloseRegionResponse.newBuilder().setClosed(closed); 1530 return builder.build(); 1531 } catch (IOException ie) { 1532 throw new ServiceException(ie); 1533 } 1534 } 1535 1536 /** 1537 * Compact a region on the region server. 1538 * @param controller the RPC controller 1539 * @param request the request n 1540 */ 1541 @Override 1542 @QosPriority(priority = HConstants.ADMIN_QOS) 1543 public CompactRegionResponse compactRegion(final RpcController controller, 1544 final CompactRegionRequest request) throws ServiceException { 1545 try { 1546 checkOpen(); 1547 requestCount.increment(); 1548 HRegion region = getRegion(request.getRegion()); 1549 // Quota support is enabled, the requesting user is not system/super user 1550 // and a quota policy is enforced that disables compactions. 1551 if ( 1552 QuotaUtil.isQuotaEnabled(getConfiguration()) 1553 && !Superusers.isSuperUser(RpcServer.getRequestUser().orElse(null)) 1554 && this.server.getRegionServerSpaceQuotaManager() 1555 .areCompactionsDisabled(region.getTableDescriptor().getTableName()) 1556 ) { 1557 throw new DoNotRetryIOException( 1558 "Compactions on this region are " + "disabled due to a space quota violation."); 1559 } 1560 region.startRegionOperation(Operation.COMPACT_REGION); 1561 LOG.info("Compacting " + region.getRegionInfo().getRegionNameAsString()); 1562 boolean major = request.hasMajor() && request.getMajor(); 1563 if (request.hasFamily()) { 1564 byte[] family = request.getFamily().toByteArray(); 1565 String log = "User-triggered " + (major ? "major " : "") + "compaction for region " 1566 + region.getRegionInfo().getRegionNameAsString() + " and family " 1567 + Bytes.toString(family); 1568 LOG.trace(log); 1569 region.requestCompaction(family, log, Store.PRIORITY_USER, major, 1570 CompactionLifeCycleTracker.DUMMY); 1571 } else { 1572 String log = "User-triggered " + (major ? "major " : "") + "compaction for region " 1573 + region.getRegionInfo().getRegionNameAsString(); 1574 LOG.trace(log); 1575 region.requestCompaction(log, Store.PRIORITY_USER, major, CompactionLifeCycleTracker.DUMMY); 1576 } 1577 return CompactRegionResponse.newBuilder().build(); 1578 } catch (IOException ie) { 1579 throw new ServiceException(ie); 1580 } 1581 } 1582 1583 @Override 1584 public CompactionSwitchResponse compactionSwitch(RpcController controller, 1585 CompactionSwitchRequest request) throws ServiceException { 1586 rpcPreCheck("compactionSwitch"); 1587 final CompactSplit compactSplitThread = server.getCompactSplitThread(); 1588 requestCount.increment(); 1589 boolean prevState = compactSplitThread.isCompactionsEnabled(); 1590 CompactionSwitchResponse response = 1591 CompactionSwitchResponse.newBuilder().setPrevState(prevState).build(); 1592 if (prevState == request.getEnabled()) { 1593 // passed in requested state is same as current state. No action required 1594 return response; 1595 } 1596 compactSplitThread.switchCompaction(request.getEnabled()); 1597 return response; 1598 } 1599 1600 /** 1601 * Flush a region on the region server. 1602 * @param controller the RPC controller 1603 * @param request the request n 1604 */ 1605 @Override 1606 @QosPriority(priority = HConstants.ADMIN_QOS) 1607 public FlushRegionResponse flushRegion(final RpcController controller, 1608 final FlushRegionRequest request) throws ServiceException { 1609 try { 1610 checkOpen(); 1611 requestCount.increment(); 1612 HRegion region = getRegion(request.getRegion()); 1613 LOG.info("Flushing " + region.getRegionInfo().getRegionNameAsString()); 1614 boolean shouldFlush = true; 1615 if (request.hasIfOlderThanTs()) { 1616 shouldFlush = region.getEarliestFlushTimeForAllStores() < request.getIfOlderThanTs(); 1617 } 1618 FlushRegionResponse.Builder builder = FlushRegionResponse.newBuilder(); 1619 if (shouldFlush) { 1620 boolean writeFlushWalMarker = 1621 request.hasWriteFlushWalMarker() ? request.getWriteFlushWalMarker() : false; 1622 // Go behind the curtain so we can manage writing of the flush WAL marker 1623 HRegion.FlushResultImpl flushResult = null; 1624 if (request.hasFamily()) { 1625 List families = new ArrayList(); 1626 families.add(request.getFamily().toByteArray()); 1627 flushResult = 1628 region.flushcache(families, writeFlushWalMarker, FlushLifeCycleTracker.DUMMY); 1629 } else { 1630 flushResult = region.flushcache(true, writeFlushWalMarker, FlushLifeCycleTracker.DUMMY); 1631 } 1632 boolean compactionNeeded = flushResult.isCompactionNeeded(); 1633 if (compactionNeeded) { 1634 server.getCompactSplitThread().requestSystemCompaction(region, 1635 "Compaction through user triggered flush"); 1636 } 1637 builder.setFlushed(flushResult.isFlushSucceeded()); 1638 builder.setWroteFlushWalMarker(flushResult.wroteFlushWalMarker); 1639 } 1640 builder.setLastFlushTime(region.getEarliestFlushTimeForAllStores()); 1641 return builder.build(); 1642 } catch (DroppedSnapshotException ex) { 1643 // Cache flush can fail in a few places. If it fails in a critical 1644 // section, we get a DroppedSnapshotException and a replay of wal 1645 // is required. Currently the only way to do this is a restart of 1646 // the server. 1647 server.abort("Replay of WAL required. Forcing server shutdown", ex); 1648 throw new ServiceException(ex); 1649 } catch (IOException ie) { 1650 throw new ServiceException(ie); 1651 } 1652 } 1653 1654 @Override 1655 @QosPriority(priority = HConstants.ADMIN_QOS) 1656 public GetOnlineRegionResponse getOnlineRegion(final RpcController controller, 1657 final GetOnlineRegionRequest request) throws ServiceException { 1658 try { 1659 checkOpen(); 1660 requestCount.increment(); 1661 Map<String, HRegion> onlineRegions = server.getOnlineRegions(); 1662 List<RegionInfo> list = new ArrayList<>(onlineRegions.size()); 1663 for (HRegion region : onlineRegions.values()) { 1664 list.add(region.getRegionInfo()); 1665 } 1666 list.sort(RegionInfo.COMPARATOR); 1667 return ResponseConverter.buildGetOnlineRegionResponse(list); 1668 } catch (IOException ie) { 1669 throw new ServiceException(ie); 1670 } 1671 } 1672 1673 // Master implementation of this Admin Service differs given it is not 1674 // able to supply detail only known to RegionServer. See note on 1675 // MasterRpcServers#getRegionInfo. 1676 @Override 1677 @QosPriority(priority = HConstants.ADMIN_QOS) 1678 public GetRegionInfoResponse getRegionInfo(final RpcController controller, 1679 final GetRegionInfoRequest request) throws ServiceException { 1680 try { 1681 checkOpen(); 1682 requestCount.increment(); 1683 HRegion region = getRegion(request.getRegion()); 1684 RegionInfo info = region.getRegionInfo(); 1685 byte[] bestSplitRow; 1686 if (request.hasBestSplitRow() && request.getBestSplitRow()) { 1687 bestSplitRow = region.checkSplit(true).orElse(null); 1688 // when all table data are in memstore, bestSplitRow = null 1689 // try to flush region first 1690 if (bestSplitRow == null) { 1691 region.flush(true); 1692 bestSplitRow = region.checkSplit(true).orElse(null); 1693 } 1694 } else { 1695 bestSplitRow = null; 1696 } 1697 GetRegionInfoResponse.Builder builder = GetRegionInfoResponse.newBuilder(); 1698 builder.setRegionInfo(ProtobufUtil.toRegionInfo(info)); 1699 if (request.hasCompactionState() && request.getCompactionState()) { 1700 builder.setCompactionState(ProtobufUtil.createCompactionState(region.getCompactionState())); 1701 } 1702 builder.setSplittable(region.isSplittable()); 1703 builder.setMergeable(region.isMergeable()); 1704 if (request.hasBestSplitRow() && request.getBestSplitRow() && bestSplitRow != null) { 1705 builder.setBestSplitRow(UnsafeByteOperations.unsafeWrap(bestSplitRow)); 1706 } 1707 return builder.build(); 1708 } catch (IOException ie) { 1709 throw new ServiceException(ie); 1710 } 1711 } 1712 1713 @Override 1714 @QosPriority(priority = HConstants.ADMIN_QOS) 1715 public GetRegionLoadResponse getRegionLoad(RpcController controller, GetRegionLoadRequest request) 1716 throws ServiceException { 1717 1718 List<HRegion> regions; 1719 if (request.hasTableName()) { 1720 TableName tableName = ProtobufUtil.toTableName(request.getTableName()); 1721 regions = server.getRegions(tableName); 1722 } else { 1723 regions = server.getRegions(); 1724 } 1725 List<RegionLoad> rLoads = new ArrayList<>(regions.size()); 1726 RegionLoad.Builder regionLoadBuilder = ClusterStatusProtos.RegionLoad.newBuilder(); 1727 RegionSpecifier.Builder regionSpecifier = RegionSpecifier.newBuilder(); 1728 1729 try { 1730 for (HRegion region : regions) { 1731 rLoads.add(server.createRegionLoad(region, regionLoadBuilder, regionSpecifier)); 1732 } 1733 } catch (IOException e) { 1734 throw new ServiceException(e); 1735 } 1736 GetRegionLoadResponse.Builder builder = GetRegionLoadResponse.newBuilder(); 1737 builder.addAllRegionLoads(rLoads); 1738 return builder.build(); 1739 } 1740 1741 @Override 1742 @QosPriority(priority = HConstants.ADMIN_QOS) 1743 public ClearCompactionQueuesResponse clearCompactionQueues(RpcController controller, 1744 ClearCompactionQueuesRequest request) throws ServiceException { 1745 LOG.debug("Client=" + RpcServer.getRequestUserName().orElse(null) + "/" 1746 + RpcServer.getRemoteAddress().orElse(null) + " clear compactions queue"); 1747 ClearCompactionQueuesResponse.Builder respBuilder = ClearCompactionQueuesResponse.newBuilder(); 1748 requestCount.increment(); 1749 if (clearCompactionQueues.compareAndSet(false, true)) { 1750 final CompactSplit compactSplitThread = server.getCompactSplitThread(); 1751 try { 1752 checkOpen(); 1753 server.getRegionServerCoprocessorHost().preClearCompactionQueues(); 1754 for (String queueName : request.getQueueNameList()) { 1755 LOG.debug("clear " + queueName + " compaction queue"); 1756 switch (queueName) { 1757 case "long": 1758 compactSplitThread.clearLongCompactionsQueue(); 1759 break; 1760 case "short": 1761 compactSplitThread.clearShortCompactionsQueue(); 1762 break; 1763 default: 1764 LOG.warn("Unknown queue name " + queueName); 1765 throw new IOException("Unknown queue name " + queueName); 1766 } 1767 } 1768 server.getRegionServerCoprocessorHost().postClearCompactionQueues(); 1769 } catch (IOException ie) { 1770 throw new ServiceException(ie); 1771 } finally { 1772 clearCompactionQueues.set(false); 1773 } 1774 } else { 1775 LOG.warn("Clear compactions queue is executing by other admin."); 1776 } 1777 return respBuilder.build(); 1778 } 1779 1780 /** 1781 * Get some information of the region server. 1782 * @param controller the RPC controller 1783 * @param request the request n 1784 */ 1785 @Override 1786 @QosPriority(priority = HConstants.ADMIN_QOS) 1787 public GetServerInfoResponse getServerInfo(final RpcController controller, 1788 final GetServerInfoRequest request) throws ServiceException { 1789 try { 1790 checkOpen(); 1791 } catch (IOException ie) { 1792 throw new ServiceException(ie); 1793 } 1794 requestCount.increment(); 1795 int infoPort = server.getInfoServer() != null ? server.getInfoServer().getPort() : -1; 1796 return ResponseConverter.buildGetServerInfoResponse(server.getServerName(), infoPort); 1797 } 1798 1799 @Override 1800 @QosPriority(priority = HConstants.ADMIN_QOS) 1801 public GetStoreFileResponse getStoreFile(final RpcController controller, 1802 final GetStoreFileRequest request) throws ServiceException { 1803 try { 1804 checkOpen(); 1805 HRegion region = getRegion(request.getRegion()); 1806 requestCount.increment(); 1807 Set<byte[]> columnFamilies; 1808 if (request.getFamilyCount() == 0) { 1809 columnFamilies = region.getTableDescriptor().getColumnFamilyNames(); 1810 } else { 1811 columnFamilies = new TreeSet<>(Bytes.BYTES_RAWCOMPARATOR); 1812 for (ByteString cf : request.getFamilyList()) { 1813 columnFamilies.add(cf.toByteArray()); 1814 } 1815 } 1816 int nCF = columnFamilies.size(); 1817 List<String> fileList = region.getStoreFileList(columnFamilies.toArray(new byte[nCF][])); 1818 GetStoreFileResponse.Builder builder = GetStoreFileResponse.newBuilder(); 1819 builder.addAllStoreFile(fileList); 1820 return builder.build(); 1821 } catch (IOException ie) { 1822 throw new ServiceException(ie); 1823 } 1824 } 1825 1826 private void throwOnWrongStartCode(OpenRegionRequest request) throws ServiceException { 1827 if (!request.hasServerStartCode()) { 1828 LOG.warn("OpenRegionRequest for {} does not have a start code", request.getOpenInfoList()); 1829 return; 1830 } 1831 throwOnWrongStartCode(request.getServerStartCode()); 1832 } 1833 1834 private void throwOnWrongStartCode(CloseRegionRequest request) throws ServiceException { 1835 if (!request.hasServerStartCode()) { 1836 LOG.warn("CloseRegionRequest for {} does not have a start code", request.getRegion()); 1837 return; 1838 } 1839 throwOnWrongStartCode(request.getServerStartCode()); 1840 } 1841 1842 private void throwOnWrongStartCode(long serverStartCode) throws ServiceException { 1843 // check that we are the same server that this RPC is intended for. 1844 if (server.getServerName().getStartcode() != serverStartCode) { 1845 throw new ServiceException(new DoNotRetryIOException( 1846 "This RPC was intended for a " + "different server with startCode: " + serverStartCode 1847 + ", this server is: " + server.getServerName())); 1848 } 1849 } 1850 1851 private void throwOnWrongStartCode(ExecuteProceduresRequest req) throws ServiceException { 1852 if (req.getOpenRegionCount() > 0) { 1853 for (OpenRegionRequest openReq : req.getOpenRegionList()) { 1854 throwOnWrongStartCode(openReq); 1855 } 1856 } 1857 if (req.getCloseRegionCount() > 0) { 1858 for (CloseRegionRequest closeReq : req.getCloseRegionList()) { 1859 throwOnWrongStartCode(closeReq); 1860 } 1861 } 1862 } 1863 1864 /** 1865 * Open asynchronously a region or a set of regions on the region server. The opening is 1866 * coordinated by ZooKeeper, and this method requires the znode to be created before being called. 1867 * As a consequence, this method should be called only from the master. 1868 * <p> 1869 * Different manages states for the region are: 1870 * </p> 1871 * <ul> 1872 * <li>region not opened: the region opening will start asynchronously.</li> 1873 * <li>a close is already in progress: this is considered as an error.</li> 1874 * <li>an open is already in progress: this new open request will be ignored. This is important 1875 * because the Master can do multiple requests if it crashes.</li> 1876 * <li>the region is already opened: this new open request will be ignored.</li> 1877 * </ul> 1878 * <p> 1879 * Bulk assign: If there are more than 1 region to open, it will be considered as a bulk assign. 1880 * For a single region opening, errors are sent through a ServiceException. For bulk assign, 1881 * errors are put in the response as FAILED_OPENING. 1882 * </p> 1883 * @param controller the RPC controller 1884 * @param request the request n 1885 */ 1886 @Override 1887 @QosPriority(priority = HConstants.ADMIN_QOS) 1888 public OpenRegionResponse openRegion(final RpcController controller, 1889 final OpenRegionRequest request) throws ServiceException { 1890 requestCount.increment(); 1891 throwOnWrongStartCode(request); 1892 1893 OpenRegionResponse.Builder builder = OpenRegionResponse.newBuilder(); 1894 final int regionCount = request.getOpenInfoCount(); 1895 final Map<TableName, TableDescriptor> htds = new HashMap<>(regionCount); 1896 final boolean isBulkAssign = regionCount > 1; 1897 try { 1898 checkOpen(); 1899 } catch (IOException ie) { 1900 TableName tableName = null; 1901 if (regionCount == 1) { 1902 org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionInfo ri = 1903 request.getOpenInfo(0).getRegion(); 1904 if (ri != null) { 1905 tableName = ProtobufUtil.toTableName(ri.getTableName()); 1906 } 1907 } 1908 if (!TableName.META_TABLE_NAME.equals(tableName)) { 1909 throw new ServiceException(ie); 1910 } 1911 // We are assigning meta, wait a little for regionserver to finish initialization. 1912 // Default to quarter of RPC timeout 1913 int timeout = server.getConfiguration().getInt(HConstants.HBASE_RPC_TIMEOUT_KEY, 1914 HConstants.DEFAULT_HBASE_RPC_TIMEOUT) >> 2; 1915 long endTime = EnvironmentEdgeManager.currentTime() + timeout; 1916 synchronized (server.online) { 1917 try { 1918 while ( 1919 EnvironmentEdgeManager.currentTime() <= endTime && !server.isStopped() 1920 && !server.isOnline() 1921 ) { 1922 server.online.wait(server.getMsgInterval()); 1923 } 1924 checkOpen(); 1925 } catch (InterruptedException t) { 1926 Thread.currentThread().interrupt(); 1927 throw new ServiceException(t); 1928 } catch (IOException e) { 1929 throw new ServiceException(e); 1930 } 1931 } 1932 } 1933 1934 long masterSystemTime = request.hasMasterSystemTime() ? request.getMasterSystemTime() : -1; 1935 1936 for (RegionOpenInfo regionOpenInfo : request.getOpenInfoList()) { 1937 final RegionInfo region = ProtobufUtil.toRegionInfo(regionOpenInfo.getRegion()); 1938 TableDescriptor htd; 1939 try { 1940 String encodedName = region.getEncodedName(); 1941 byte[] encodedNameBytes = region.getEncodedNameAsBytes(); 1942 final HRegion onlineRegion = server.getRegion(encodedName); 1943 if (onlineRegion != null) { 1944 // The region is already online. This should not happen any more. 1945 String error = "Received OPEN for the region:" + region.getRegionNameAsString() 1946 + ", which is already online"; 1947 LOG.warn(error); 1948 // server.abort(error); 1949 // throw new IOException(error); 1950 builder.addOpeningState(RegionOpeningState.OPENED); 1951 continue; 1952 } 1953 LOG.info("Open " + region.getRegionNameAsString()); 1954 1955 final Boolean previous = 1956 server.getRegionsInTransitionInRS().putIfAbsent(encodedNameBytes, Boolean.TRUE); 1957 1958 if (Boolean.FALSE.equals(previous)) { 1959 if (server.getRegion(encodedName) != null) { 1960 // There is a close in progress. This should not happen any more. 1961 String error = "Received OPEN for the region:" + region.getRegionNameAsString() 1962 + ", which we are already trying to CLOSE"; 1963 server.abort(error); 1964 throw new IOException(error); 1965 } 1966 server.getRegionsInTransitionInRS().put(encodedNameBytes, Boolean.TRUE); 1967 } 1968 1969 if (Boolean.TRUE.equals(previous)) { 1970 // An open is in progress. This is supported, but let's log this. 1971 LOG.info("Receiving OPEN for the region:" + region.getRegionNameAsString() 1972 + ", which we are already trying to OPEN" 1973 + " - ignoring this new request for this region."); 1974 } 1975 1976 // We are opening this region. If it moves back and forth for whatever reason, we don't 1977 // want to keep returning the stale moved record while we are opening/if we close again. 1978 server.removeFromMovedRegions(region.getEncodedName()); 1979 1980 if (previous == null || !previous.booleanValue()) { 1981 htd = htds.get(region.getTable()); 1982 if (htd == null) { 1983 htd = server.getTableDescriptors().get(region.getTable()); 1984 htds.put(region.getTable(), htd); 1985 } 1986 if (htd == null) { 1987 throw new IOException("Missing table descriptor for " + region.getEncodedName()); 1988 } 1989 // If there is no action in progress, we can submit a specific handler. 1990 // Need to pass the expected version in the constructor. 1991 if (server.getExecutorService() == null) { 1992 LOG.info("No executor executorService; skipping open request"); 1993 } else { 1994 if (region.isMetaRegion()) { 1995 server.getExecutorService() 1996 .submit(new OpenMetaHandler(server, server, region, htd, masterSystemTime)); 1997 } else { 1998 if (regionOpenInfo.getFavoredNodesCount() > 0) { 1999 server.updateRegionFavoredNodesMapping(region.getEncodedName(), 2000 regionOpenInfo.getFavoredNodesList()); 2001 } 2002 if (htd.getPriority() >= HConstants.ADMIN_QOS || region.getTable().isSystemTable()) { 2003 server.getExecutorService().submit( 2004 new OpenPriorityRegionHandler(server, server, region, htd, masterSystemTime)); 2005 } else { 2006 server.getExecutorService() 2007 .submit(new OpenRegionHandler(server, server, region, htd, masterSystemTime)); 2008 } 2009 } 2010 } 2011 } 2012 2013 builder.addOpeningState(RegionOpeningState.OPENED); 2014 } catch (IOException ie) { 2015 LOG.warn("Failed opening region " + region.getRegionNameAsString(), ie); 2016 if (isBulkAssign) { 2017 builder.addOpeningState(RegionOpeningState.FAILED_OPENING); 2018 } else { 2019 throw new ServiceException(ie); 2020 } 2021 } 2022 } 2023 return builder.build(); 2024 } 2025 2026 /** 2027 * Warmup a region on this server. This method should only be called by Master. It synchronously 2028 * opens the region and closes the region bringing the most important pages in cache. 2029 */ 2030 @Override 2031 public WarmupRegionResponse warmupRegion(final RpcController controller, 2032 final WarmupRegionRequest request) throws ServiceException { 2033 final RegionInfo region = ProtobufUtil.toRegionInfo(request.getRegionInfo()); 2034 WarmupRegionResponse response = WarmupRegionResponse.getDefaultInstance(); 2035 try { 2036 checkOpen(); 2037 String encodedName = region.getEncodedName(); 2038 byte[] encodedNameBytes = region.getEncodedNameAsBytes(); 2039 final HRegion onlineRegion = server.getRegion(encodedName); 2040 if (onlineRegion != null) { 2041 LOG.info("{} is online; skipping warmup", region); 2042 return response; 2043 } 2044 TableDescriptor htd = server.getTableDescriptors().get(region.getTable()); 2045 if (server.getRegionsInTransitionInRS().containsKey(encodedNameBytes)) { 2046 LOG.info("{} is in transition; skipping warmup", region); 2047 return response; 2048 } 2049 LOG.info("Warmup {}", region.getRegionNameAsString()); 2050 HRegion.warmupHRegion(region, htd, server.getWAL(region), server.getConfiguration(), server, 2051 null); 2052 } catch (IOException ie) { 2053 LOG.error("Failed warmup of {}", region.getRegionNameAsString(), ie); 2054 throw new ServiceException(ie); 2055 } 2056 2057 return response; 2058 } 2059 2060 private CellScanner getAndReset(RpcController controller) { 2061 HBaseRpcController hrc = (HBaseRpcController) controller; 2062 CellScanner cells = hrc.cellScanner(); 2063 hrc.setCellScanner(null); 2064 return cells; 2065 } 2066 2067 /** 2068 * Replay the given changes when distributedLogReplay WAL edits from a failed RS. The guarantee is 2069 * that the given mutations will be durable on the receiving RS if this method returns without any 2070 * exception. 2071 * @param controller the RPC controller 2072 * @param request the request 2073 * @deprecated Since 3.0.0, will be removed in 4.0.0. Not used any more, put here only for 2074 * compatibility with old region replica implementation. Now we will use 2075 * {@code replicateToReplica} method instead. 2076 */ 2077 @Deprecated 2078 @Override 2079 @QosPriority(priority = HConstants.REPLAY_QOS) 2080 public ReplicateWALEntryResponse replay(final RpcController controller, 2081 final ReplicateWALEntryRequest request) throws ServiceException { 2082 long before = EnvironmentEdgeManager.currentTime(); 2083 CellScanner cells = getAndReset(controller); 2084 try { 2085 checkOpen(); 2086 List<WALEntry> entries = request.getEntryList(); 2087 if (entries == null || entries.isEmpty()) { 2088 // empty input 2089 return ReplicateWALEntryResponse.newBuilder().build(); 2090 } 2091 ByteString regionName = entries.get(0).getKey().getEncodedRegionName(); 2092 HRegion region = server.getRegionByEncodedName(regionName.toStringUtf8()); 2093 RegionCoprocessorHost coprocessorHost = 2094 ServerRegionReplicaUtil.isDefaultReplica(region.getRegionInfo()) 2095 ? region.getCoprocessorHost() 2096 : null; // do not invoke coprocessors if this is a secondary region replica 2097 List<Pair<WALKey, WALEdit>> walEntries = new ArrayList<>(); 2098 2099 // Skip adding the edits to WAL if this is a secondary region replica 2100 boolean isPrimary = RegionReplicaUtil.isDefaultReplica(region.getRegionInfo()); 2101 Durability durability = isPrimary ? Durability.USE_DEFAULT : Durability.SKIP_WAL; 2102 2103 for (WALEntry entry : entries) { 2104 if (!regionName.equals(entry.getKey().getEncodedRegionName())) { 2105 throw new NotServingRegionException("Replay request contains entries from multiple " 2106 + "regions. First region:" + regionName.toStringUtf8() + " , other region:" 2107 + entry.getKey().getEncodedRegionName()); 2108 } 2109 if (server.nonceManager != null && isPrimary) { 2110 long nonceGroup = 2111 entry.getKey().hasNonceGroup() ? entry.getKey().getNonceGroup() : HConstants.NO_NONCE; 2112 long nonce = entry.getKey().hasNonce() ? entry.getKey().getNonce() : HConstants.NO_NONCE; 2113 server.nonceManager.reportOperationFromWal(nonceGroup, nonce, 2114 entry.getKey().getWriteTime()); 2115 } 2116 Pair<WALKey, WALEdit> walEntry = (coprocessorHost == null) ? null : new Pair<>(); 2117 List<MutationReplay> edits = 2118 WALSplitUtil.getMutationsFromWALEntry(entry, cells, walEntry, durability); 2119 if (coprocessorHost != null) { 2120 // Start coprocessor replay here. The coprocessor is for each WALEdit instead of a 2121 // KeyValue. 2122 if ( 2123 coprocessorHost.preWALRestore(region.getRegionInfo(), walEntry.getFirst(), 2124 walEntry.getSecond()) 2125 ) { 2126 // if bypass this log entry, ignore it ... 2127 continue; 2128 } 2129 walEntries.add(walEntry); 2130 } 2131 if (edits != null && !edits.isEmpty()) { 2132 // HBASE-17924 2133 // sort to improve lock efficiency 2134 Collections.sort(edits, (v1, v2) -> Row.COMPARATOR.compare(v1.mutation, v2.mutation)); 2135 long replaySeqId = (entry.getKey().hasOrigSequenceNumber()) 2136 ? entry.getKey().getOrigSequenceNumber() 2137 : entry.getKey().getLogSequenceNumber(); 2138 OperationStatus[] result = doReplayBatchOp(region, edits, replaySeqId); 2139 // check if it's a partial success 2140 for (int i = 0; result != null && i < result.length; i++) { 2141 if (result[i] != OperationStatus.SUCCESS) { 2142 throw new IOException(result[i].getExceptionMsg()); 2143 } 2144 } 2145 } 2146 } 2147 2148 // sync wal at the end because ASYNC_WAL is used above 2149 WAL wal = region.getWAL(); 2150 if (wal != null) { 2151 wal.sync(); 2152 } 2153 2154 if (coprocessorHost != null) { 2155 for (Pair<WALKey, WALEdit> entry : walEntries) { 2156 coprocessorHost.postWALRestore(region.getRegionInfo(), entry.getFirst(), 2157 entry.getSecond()); 2158 } 2159 } 2160 return ReplicateWALEntryResponse.newBuilder().build(); 2161 } catch (IOException ie) { 2162 throw new ServiceException(ie); 2163 } finally { 2164 final MetricsRegionServer metricsRegionServer = server.getMetrics(); 2165 if (metricsRegionServer != null) { 2166 metricsRegionServer.updateReplay(EnvironmentEdgeManager.currentTime() - before); 2167 } 2168 } 2169 } 2170 2171 /** 2172 * Replay the given changes on a secondary replica 2173 */ 2174 @Override 2175 public ReplicateWALEntryResponse replicateToReplica(RpcController controller, 2176 ReplicateWALEntryRequest request) throws ServiceException { 2177 CellScanner cells = getAndReset(controller); 2178 try { 2179 checkOpen(); 2180 List<WALEntry> entries = request.getEntryList(); 2181 if (entries == null || entries.isEmpty()) { 2182 // empty input 2183 return ReplicateWALEntryResponse.newBuilder().build(); 2184 } 2185 ByteString regionName = entries.get(0).getKey().getEncodedRegionName(); 2186 HRegion region = server.getRegionByEncodedName(regionName.toStringUtf8()); 2187 if (RegionReplicaUtil.isDefaultReplica(region.getRegionInfo())) { 2188 throw new DoNotRetryIOException( 2189 "Should not replicate to primary replica " + region.getRegionInfo() + ", CODE BUG?"); 2190 } 2191 for (WALEntry entry : entries) { 2192 if (!regionName.equals(entry.getKey().getEncodedRegionName())) { 2193 throw new NotServingRegionException( 2194 "ReplicateToReplica request contains entries from multiple " + "regions. First region:" 2195 + regionName.toStringUtf8() + " , other region:" 2196 + entry.getKey().getEncodedRegionName()); 2197 } 2198 region.replayWALEntry(entry, cells); 2199 } 2200 return ReplicateWALEntryResponse.newBuilder().build(); 2201 } catch (IOException ie) { 2202 throw new ServiceException(ie); 2203 } 2204 } 2205 2206 private void checkShouldRejectReplicationRequest(List<WALEntry> entries) throws IOException { 2207 ReplicationSourceService replicationSource = server.getReplicationSourceService(); 2208 if (replicationSource == null || entries.isEmpty()) { 2209 return; 2210 } 2211 // We can ensure that all entries are for one peer, so only need to check one entry's 2212 // table name. if the table hit sync replication at peer side and the peer cluster 2213 // is (or is transiting to) state ACTIVE or DOWNGRADE_ACTIVE, we should reject to apply 2214 // those entries according to the design doc. 2215 TableName table = TableName.valueOf(entries.get(0).getKey().getTableName().toByteArray()); 2216 if ( 2217 replicationSource.getSyncReplicationPeerInfoProvider().checkState(table, 2218 RejectReplicationRequestStateChecker.get()) 2219 ) { 2220 throw new DoNotRetryIOException( 2221 "Reject to apply to sink cluster because sync replication state of sink cluster " 2222 + "is ACTIVE or DOWNGRADE_ACTIVE, table: " + table); 2223 } 2224 } 2225 2226 /** 2227 * Replicate WAL entries on the region server. 2228 * @param controller the RPC controller 2229 * @param request the request 2230 */ 2231 @Override 2232 @QosPriority(priority = HConstants.REPLICATION_QOS) 2233 public ReplicateWALEntryResponse replicateWALEntry(final RpcController controller, 2234 final ReplicateWALEntryRequest request) throws ServiceException { 2235 try { 2236 checkOpen(); 2237 if (server.getReplicationSinkService() != null) { 2238 requestCount.increment(); 2239 List<WALEntry> entries = request.getEntryList(); 2240 checkShouldRejectReplicationRequest(entries); 2241 CellScanner cellScanner = getAndReset(controller); 2242 server.getRegionServerCoprocessorHost().preReplicateLogEntries(); 2243 server.getReplicationSinkService().replicateLogEntries(entries, cellScanner, 2244 request.getReplicationClusterId(), request.getSourceBaseNamespaceDirPath(), 2245 request.getSourceHFileArchiveDirPath()); 2246 server.getRegionServerCoprocessorHost().postReplicateLogEntries(); 2247 return ReplicateWALEntryResponse.newBuilder().build(); 2248 } else { 2249 throw new ServiceException("Replication services are not initialized yet"); 2250 } 2251 } catch (IOException ie) { 2252 throw new ServiceException(ie); 2253 } 2254 } 2255 2256 /** 2257 * Roll the WAL writer of the region server. 2258 * @param controller the RPC controller 2259 * @param request the request n 2260 */ 2261 @Override 2262 public RollWALWriterResponse rollWALWriter(final RpcController controller, 2263 final RollWALWriterRequest request) throws ServiceException { 2264 try { 2265 checkOpen(); 2266 requestCount.increment(); 2267 server.getRegionServerCoprocessorHost().preRollWALWriterRequest(); 2268 server.getWalRoller().requestRollAll(); 2269 server.getRegionServerCoprocessorHost().postRollWALWriterRequest(); 2270 RollWALWriterResponse.Builder builder = RollWALWriterResponse.newBuilder(); 2271 return builder.build(); 2272 } catch (IOException ie) { 2273 throw new ServiceException(ie); 2274 } 2275 } 2276 2277 /** 2278 * Stop the region server. 2279 * @param controller the RPC controller 2280 * @param request the request n 2281 */ 2282 @Override 2283 @QosPriority(priority = HConstants.ADMIN_QOS) 2284 public StopServerResponse stopServer(final RpcController controller, 2285 final StopServerRequest request) throws ServiceException { 2286 rpcPreCheck("stopServer"); 2287 requestCount.increment(); 2288 String reason = request.getReason(); 2289 server.stop(reason); 2290 return StopServerResponse.newBuilder().build(); 2291 } 2292 2293 @Override 2294 public UpdateFavoredNodesResponse updateFavoredNodes(RpcController controller, 2295 UpdateFavoredNodesRequest request) throws ServiceException { 2296 rpcPreCheck("updateFavoredNodes"); 2297 List<UpdateFavoredNodesRequest.RegionUpdateInfo> openInfoList = request.getUpdateInfoList(); 2298 UpdateFavoredNodesResponse.Builder respBuilder = UpdateFavoredNodesResponse.newBuilder(); 2299 for (UpdateFavoredNodesRequest.RegionUpdateInfo regionUpdateInfo : openInfoList) { 2300 RegionInfo hri = ProtobufUtil.toRegionInfo(regionUpdateInfo.getRegion()); 2301 if (regionUpdateInfo.getFavoredNodesCount() > 0) { 2302 server.updateRegionFavoredNodesMapping(hri.getEncodedName(), 2303 regionUpdateInfo.getFavoredNodesList()); 2304 } 2305 } 2306 respBuilder.setResponse(openInfoList.size()); 2307 return respBuilder.build(); 2308 } 2309 2310 /** 2311 * Atomically bulk load several HFiles into an open region 2312 * @return true if successful, false is failed but recoverably (no action) 2313 * @throws ServiceException if failed unrecoverably 2314 */ 2315 @Override 2316 public BulkLoadHFileResponse bulkLoadHFile(final RpcController controller, 2317 final BulkLoadHFileRequest request) throws ServiceException { 2318 long start = EnvironmentEdgeManager.currentTime(); 2319 List<String> clusterIds = new ArrayList<>(request.getClusterIdsList()); 2320 if (clusterIds.contains(this.server.getClusterId())) { 2321 return BulkLoadHFileResponse.newBuilder().setLoaded(true).build(); 2322 } else { 2323 clusterIds.add(this.server.getClusterId()); 2324 } 2325 try { 2326 checkOpen(); 2327 requestCount.increment(); 2328 HRegion region = getRegion(request.getRegion()); 2329 final boolean spaceQuotaEnabled = QuotaUtil.isQuotaEnabled(getConfiguration()); 2330 long sizeToBeLoaded = -1; 2331 2332 // Check to see if this bulk load would exceed the space quota for this table 2333 if (spaceQuotaEnabled) { 2334 ActivePolicyEnforcement activeSpaceQuotas = getSpaceQuotaManager().getActiveEnforcements(); 2335 SpaceViolationPolicyEnforcement enforcement = 2336 activeSpaceQuotas.getPolicyEnforcement(region); 2337 if (enforcement != null) { 2338 // Bulk loads must still be atomic. We must enact all or none. 2339 List<String> filePaths = new ArrayList<>(request.getFamilyPathCount()); 2340 for (FamilyPath familyPath : request.getFamilyPathList()) { 2341 filePaths.add(familyPath.getPath()); 2342 } 2343 // Check if the batch of files exceeds the current quota 2344 sizeToBeLoaded = enforcement.computeBulkLoadSize(getFileSystem(filePaths), filePaths); 2345 } 2346 } 2347 // secure bulk load 2348 Map<byte[], List<Path>> map = 2349 server.getSecureBulkLoadManager().secureBulkLoadHFiles(region, request, clusterIds); 2350 BulkLoadHFileResponse.Builder builder = BulkLoadHFileResponse.newBuilder(); 2351 builder.setLoaded(map != null); 2352 if (map != null) { 2353 // Treat any negative size as a flag to "ignore" updating the region size as that is 2354 // not possible to occur in real life (cannot bulk load a file with negative size) 2355 if (spaceQuotaEnabled && sizeToBeLoaded > 0) { 2356 if (LOG.isTraceEnabled()) { 2357 LOG.trace("Incrementing space use of " + region.getRegionInfo() + " by " 2358 + sizeToBeLoaded + " bytes"); 2359 } 2360 // Inform space quotas of the new files for this region 2361 getSpaceQuotaManager().getRegionSizeStore().incrementRegionSize(region.getRegionInfo(), 2362 sizeToBeLoaded); 2363 } 2364 } 2365 return builder.build(); 2366 } catch (IOException ie) { 2367 throw new ServiceException(ie); 2368 } finally { 2369 final MetricsRegionServer metricsRegionServer = server.getMetrics(); 2370 if (metricsRegionServer != null) { 2371 metricsRegionServer.updateBulkLoad(EnvironmentEdgeManager.currentTime() - start); 2372 } 2373 } 2374 } 2375 2376 @Override 2377 public PrepareBulkLoadResponse prepareBulkLoad(RpcController controller, 2378 PrepareBulkLoadRequest request) throws ServiceException { 2379 try { 2380 checkOpen(); 2381 requestCount.increment(); 2382 2383 HRegion region = getRegion(request.getRegion()); 2384 2385 String bulkToken = server.getSecureBulkLoadManager().prepareBulkLoad(region, request); 2386 PrepareBulkLoadResponse.Builder builder = PrepareBulkLoadResponse.newBuilder(); 2387 builder.setBulkToken(bulkToken); 2388 return builder.build(); 2389 } catch (IOException ie) { 2390 throw new ServiceException(ie); 2391 } 2392 } 2393 2394 @Override 2395 public CleanupBulkLoadResponse cleanupBulkLoad(RpcController controller, 2396 CleanupBulkLoadRequest request) throws ServiceException { 2397 try { 2398 checkOpen(); 2399 requestCount.increment(); 2400 2401 HRegion region = getRegion(request.getRegion()); 2402 2403 server.getSecureBulkLoadManager().cleanupBulkLoad(region, request); 2404 return CleanupBulkLoadResponse.newBuilder().build(); 2405 } catch (IOException ie) { 2406 throw new ServiceException(ie); 2407 } 2408 } 2409 2410 @Override 2411 public CoprocessorServiceResponse execService(final RpcController controller, 2412 final CoprocessorServiceRequest request) throws ServiceException { 2413 try { 2414 checkOpen(); 2415 requestCount.increment(); 2416 HRegion region = getRegion(request.getRegion()); 2417 Message result = execServiceOnRegion(region, request.getCall()); 2418 CoprocessorServiceResponse.Builder builder = CoprocessorServiceResponse.newBuilder(); 2419 builder.setRegion(RequestConverter.buildRegionSpecifier(RegionSpecifierType.REGION_NAME, 2420 region.getRegionInfo().getRegionName())); 2421 // TODO: COPIES!!!!!! 2422 builder.setValue(builder.getValueBuilder().setName(result.getClass().getName()).setValue( 2423 org.apache.hbase.thirdparty.com.google.protobuf.ByteString.copyFrom(result.toByteArray()))); 2424 return builder.build(); 2425 } catch (IOException ie) { 2426 throw new ServiceException(ie); 2427 } 2428 } 2429 2430 private FileSystem getFileSystem(List<String> filePaths) throws IOException { 2431 if (filePaths.isEmpty()) { 2432 // local hdfs 2433 return server.getFileSystem(); 2434 } 2435 // source hdfs 2436 return new Path(filePaths.get(0)).getFileSystem(server.getConfiguration()); 2437 } 2438 2439 private Message execServiceOnRegion(HRegion region, 2440 final ClientProtos.CoprocessorServiceCall serviceCall) throws IOException { 2441 // ignore the passed in controller (from the serialized call) 2442 ServerRpcController execController = new ServerRpcController(); 2443 return region.execService(execController, serviceCall); 2444 } 2445 2446 private boolean shouldRejectRequestsFromClient(HRegion region) { 2447 TableName table = region.getRegionInfo().getTable(); 2448 ReplicationSourceService service = server.getReplicationSourceService(); 2449 return service != null && service.getSyncReplicationPeerInfoProvider().checkState(table, 2450 RejectRequestsFromClientStateChecker.get()); 2451 } 2452 2453 private void rejectIfInStandByState(HRegion region) throws DoNotRetryIOException { 2454 if (shouldRejectRequestsFromClient(region)) { 2455 throw new DoNotRetryIOException( 2456 region.getRegionInfo().getRegionNameAsString() + " is in STANDBY state."); 2457 } 2458 } 2459 2460 /** 2461 * Get data from a table. 2462 * @param controller the RPC controller 2463 * @param request the get request n 2464 */ 2465 @Override 2466 public GetResponse get(final RpcController controller, final GetRequest request) 2467 throws ServiceException { 2468 long before = EnvironmentEdgeManager.currentTime(); 2469 OperationQuota quota = null; 2470 HRegion region = null; 2471 try { 2472 checkOpen(); 2473 requestCount.increment(); 2474 rpcGetRequestCount.increment(); 2475 region = getRegion(request.getRegion()); 2476 rejectIfInStandByState(region); 2477 2478 GetResponse.Builder builder = GetResponse.newBuilder(); 2479 ClientProtos.Get get = request.getGet(); 2480 // An asynchbase client, https://github.com/OpenTSDB/asynchbase, starts by trying to do 2481 // a get closest before. Throwing the UnknownProtocolException signals it that it needs 2482 // to switch and do hbase2 protocol (HBase servers do not tell clients what versions 2483 // they are; its a problem for non-native clients like asynchbase. HBASE-20225. 2484 if (get.hasClosestRowBefore() && get.getClosestRowBefore()) { 2485 throw new UnknownProtocolException("Is this a pre-hbase-1.0.0 or asynchbase client? " 2486 + "Client is invoking getClosestRowBefore removed in hbase-2.0.0 replaced by " 2487 + "reverse Scan."); 2488 } 2489 Boolean existence = null; 2490 Result r = null; 2491 RpcCallContext context = RpcServer.getCurrentCall().orElse(null); 2492 quota = getRpcQuotaManager().checkQuota(region, OperationQuota.OperationType.GET); 2493 2494 Get clientGet = ProtobufUtil.toGet(get); 2495 if (get.getExistenceOnly() && region.getCoprocessorHost() != null) { 2496 existence = region.getCoprocessorHost().preExists(clientGet); 2497 } 2498 if (existence == null) { 2499 if (context != null) { 2500 r = get(clientGet, (region), null, context); 2501 } else { 2502 // for test purpose 2503 r = region.get(clientGet); 2504 } 2505 if (get.getExistenceOnly()) { 2506 boolean exists = r.getExists(); 2507 if (region.getCoprocessorHost() != null) { 2508 exists = region.getCoprocessorHost().postExists(clientGet, exists); 2509 } 2510 existence = exists; 2511 } 2512 } 2513 if (existence != null) { 2514 ClientProtos.Result pbr = 2515 ProtobufUtil.toResult(existence, region.getRegionInfo().getReplicaId() != 0); 2516 builder.setResult(pbr); 2517 } else if (r != null) { 2518 ClientProtos.Result pbr; 2519 if ( 2520 isClientCellBlockSupport(context) && controller instanceof HBaseRpcController 2521 && VersionInfoUtil.hasMinimumVersion(context.getClientVersionInfo(), 1, 3) 2522 ) { 2523 pbr = ProtobufUtil.toResultNoData(r); 2524 ((HBaseRpcController) controller) 2525 .setCellScanner(CellUtil.createCellScanner(r.rawCells())); 2526 addSize(context, r, null); 2527 } else { 2528 pbr = ProtobufUtil.toResult(r); 2529 } 2530 builder.setResult(pbr); 2531 } 2532 // r.cells is null when an table.exists(get) call 2533 if (r != null && r.rawCells() != null) { 2534 quota.addGetResult(r); 2535 } 2536 return builder.build(); 2537 } catch (IOException ie) { 2538 throw new ServiceException(ie); 2539 } finally { 2540 final MetricsRegionServer metricsRegionServer = server.getMetrics(); 2541 if (metricsRegionServer != null) { 2542 TableDescriptor td = region != null ? region.getTableDescriptor() : null; 2543 if (td != null) { 2544 metricsRegionServer.updateGet(td.getTableName(), 2545 EnvironmentEdgeManager.currentTime() - before); 2546 } 2547 } 2548 if (quota != null) { 2549 quota.close(); 2550 } 2551 } 2552 } 2553 2554 private Result get(Get get, HRegion region, RegionScannersCloseCallBack closeCallBack, 2555 RpcCallContext context) throws IOException { 2556 region.prepareGet(get); 2557 boolean stale = region.getRegionInfo().getReplicaId() != 0; 2558 2559 // This method is almost the same as HRegion#get. 2560 List<Cell> results = new ArrayList<>(); 2561 long before = EnvironmentEdgeManager.currentTime(); 2562 // pre-get CP hook 2563 if (region.getCoprocessorHost() != null) { 2564 if (region.getCoprocessorHost().preGet(get, results)) { 2565 region.metricsUpdateForGet(results, before); 2566 return Result.create(results, get.isCheckExistenceOnly() ? !results.isEmpty() : null, 2567 stale); 2568 } 2569 } 2570 Scan scan = new Scan(get); 2571 if (scan.getLoadColumnFamiliesOnDemandValue() == null) { 2572 scan.setLoadColumnFamiliesOnDemand(region.isLoadingCfsOnDemandDefault()); 2573 } 2574 RegionScannerImpl scanner = null; 2575 try { 2576 scanner = region.getScanner(scan); 2577 scanner.next(results); 2578 } finally { 2579 if (scanner != null) { 2580 if (closeCallBack == null) { 2581 // If there is a context then the scanner can be added to the current 2582 // RpcCallContext. The rpc callback will take care of closing the 2583 // scanner, for eg in case 2584 // of get() 2585 context.setCallBack(scanner); 2586 } else { 2587 // The call is from multi() where the results from the get() are 2588 // aggregated and then send out to the 2589 // rpc. The rpccall back will close all such scanners created as part 2590 // of multi(). 2591 closeCallBack.addScanner(scanner); 2592 } 2593 } 2594 } 2595 2596 // post-get CP hook 2597 if (region.getCoprocessorHost() != null) { 2598 region.getCoprocessorHost().postGet(get, results); 2599 } 2600 region.metricsUpdateForGet(results, before); 2601 2602 return Result.create(results, get.isCheckExistenceOnly() ? !results.isEmpty() : null, stale); 2603 } 2604 2605 private void checkBatchSizeAndLogLargeSize(MultiRequest request) throws ServiceException { 2606 int sum = 0; 2607 String firstRegionName = null; 2608 for (RegionAction regionAction : request.getRegionActionList()) { 2609 if (sum == 0) { 2610 firstRegionName = Bytes.toStringBinary(regionAction.getRegion().getValue().toByteArray()); 2611 } 2612 sum += regionAction.getActionCount(); 2613 } 2614 if (sum > rowSizeWarnThreshold) { 2615 LOG.warn("Large batch operation detected (greater than " + rowSizeWarnThreshold 2616 + ") (HBASE-18023)." + " Requested Number of Rows: " + sum + " Client: " 2617 + RpcServer.getRequestUserName().orElse(null) + "/" 2618 + RpcServer.getRemoteAddress().orElse(null) + " first region in multi=" + firstRegionName); 2619 if (rejectRowsWithSizeOverThreshold) { 2620 throw new ServiceException( 2621 "Rejecting large batch operation for current batch with firstRegionName: " 2622 + firstRegionName + " , Requested Number of Rows: " + sum + " , Size Threshold: " 2623 + rowSizeWarnThreshold); 2624 } 2625 } 2626 } 2627 2628 private void failRegionAction(MultiResponse.Builder responseBuilder, 2629 RegionActionResult.Builder regionActionResultBuilder, RegionAction regionAction, 2630 CellScanner cellScanner, Throwable error) { 2631 rpcServer.getMetrics().exception(error); 2632 regionActionResultBuilder.setException(ResponseConverter.buildException(error)); 2633 responseBuilder.addRegionActionResult(regionActionResultBuilder.build()); 2634 // All Mutations in this RegionAction not executed as we can not see the Region online here 2635 // in this RS. Will be retried from Client. Skipping all the Cells in CellScanner 2636 // corresponding to these Mutations. 2637 if (cellScanner != null) { 2638 skipCellsForMutations(regionAction.getActionList(), cellScanner); 2639 } 2640 } 2641 2642 private boolean isReplicationRequest(Action action) { 2643 // replication request can only be put or delete. 2644 if (!action.hasMutation()) { 2645 return false; 2646 } 2647 MutationProto mutation = action.getMutation(); 2648 MutationType type = mutation.getMutateType(); 2649 if (type != MutationType.PUT && type != MutationType.DELETE) { 2650 return false; 2651 } 2652 // replication will set a special attribute so we can make use of it to decide whether a request 2653 // is for replication. 2654 return mutation.getAttributeList().stream().map(p -> p.getName()) 2655 .filter(n -> n.equals(ReplicationUtils.REPLICATION_ATTR_NAME)).findAny().isPresent(); 2656 } 2657 2658 /** 2659 * Execute multiple actions on a table: get, mutate, and/or execCoprocessor 2660 * @param rpcc the RPC controller 2661 * @param request the multi request n 2662 */ 2663 @Override 2664 public MultiResponse multi(final RpcController rpcc, final MultiRequest request) 2665 throws ServiceException { 2666 try { 2667 checkOpen(); 2668 } catch (IOException ie) { 2669 throw new ServiceException(ie); 2670 } 2671 2672 checkBatchSizeAndLogLargeSize(request); 2673 2674 // rpc controller is how we bring in data via the back door; it is unprotobuf'ed data. 2675 // It is also the conduit via which we pass back data. 2676 HBaseRpcController controller = (HBaseRpcController) rpcc; 2677 CellScanner cellScanner = controller != null ? controller.cellScanner() : null; 2678 if (controller != null) { 2679 controller.setCellScanner(null); 2680 } 2681 2682 long nonceGroup = request.hasNonceGroup() ? request.getNonceGroup() : HConstants.NO_NONCE; 2683 2684 MultiResponse.Builder responseBuilder = MultiResponse.newBuilder(); 2685 RegionActionResult.Builder regionActionResultBuilder = RegionActionResult.newBuilder(); 2686 this.rpcMultiRequestCount.increment(); 2687 this.requestCount.increment(); 2688 ActivePolicyEnforcement spaceQuotaEnforcement = getSpaceQuotaManager().getActiveEnforcements(); 2689 2690 // We no longer use MultiRequest#condition. Instead, we use RegionAction#condition. The 2691 // following logic is for backward compatibility as old clients still use 2692 // MultiRequest#condition in case of checkAndMutate with RowMutations. 2693 if (request.hasCondition()) { 2694 if (request.getRegionActionList().isEmpty()) { 2695 // If the region action list is empty, do nothing. 2696 responseBuilder.setProcessed(true); 2697 return responseBuilder.build(); 2698 } 2699 2700 RegionAction regionAction = request.getRegionAction(0); 2701 2702 // When request.hasCondition() is true, regionAction.getAtomic() should be always true. So 2703 // we can assume regionAction.getAtomic() is true here. 2704 assert regionAction.getAtomic(); 2705 2706 OperationQuota quota; 2707 HRegion region; 2708 RegionSpecifier regionSpecifier = regionAction.getRegion(); 2709 2710 try { 2711 region = getRegion(regionSpecifier); 2712 quota = getRpcQuotaManager().checkQuota(region, regionAction.getActionList()); 2713 } catch (IOException e) { 2714 failRegionAction(responseBuilder, regionActionResultBuilder, regionAction, cellScanner, e); 2715 return responseBuilder.build(); 2716 } 2717 2718 try { 2719 boolean rejectIfFromClient = shouldRejectRequestsFromClient(region); 2720 // We only allow replication in standby state and it will not set the atomic flag. 2721 if (rejectIfFromClient) { 2722 failRegionAction(responseBuilder, regionActionResultBuilder, regionAction, cellScanner, 2723 new DoNotRetryIOException( 2724 region.getRegionInfo().getRegionNameAsString() + " is in STANDBY state")); 2725 return responseBuilder.build(); 2726 } 2727 2728 try { 2729 CheckAndMutateResult result = checkAndMutate(region, regionAction.getActionList(), 2730 cellScanner, request.getCondition(), nonceGroup, spaceQuotaEnforcement); 2731 responseBuilder.setProcessed(result.isSuccess()); 2732 ClientProtos.ResultOrException.Builder resultOrExceptionOrBuilder = 2733 ClientProtos.ResultOrException.newBuilder(); 2734 for (int i = 0; i < regionAction.getActionCount(); i++) { 2735 // To unify the response format with doNonAtomicRegionMutation and read through 2736 // client's AsyncProcess we have to add an empty result instance per operation 2737 resultOrExceptionOrBuilder.clear(); 2738 resultOrExceptionOrBuilder.setIndex(i); 2739 regionActionResultBuilder.addResultOrException(resultOrExceptionOrBuilder.build()); 2740 } 2741 } catch (IOException e) { 2742 rpcServer.getMetrics().exception(e); 2743 // As it's an atomic operation with a condition, we may expect it's a global failure. 2744 regionActionResultBuilder.setException(ResponseConverter.buildException(e)); 2745 } 2746 } finally { 2747 quota.close(); 2748 } 2749 2750 responseBuilder.addRegionActionResult(regionActionResultBuilder.build()); 2751 ClientProtos.RegionLoadStats regionLoadStats = region.getLoadStatistics(); 2752 if (regionLoadStats != null) { 2753 responseBuilder.setRegionStatistics(MultiRegionLoadStats.newBuilder() 2754 .addRegion(regionSpecifier).addStat(regionLoadStats).build()); 2755 } 2756 return responseBuilder.build(); 2757 } 2758 2759 // this will contain all the cells that we need to return. It's created later, if needed. 2760 List<CellScannable> cellsToReturn = null; 2761 RegionScannersCloseCallBack closeCallBack = null; 2762 RpcCallContext context = RpcServer.getCurrentCall().orElse(null); 2763 Map<RegionSpecifier, ClientProtos.RegionLoadStats> regionStats = 2764 new HashMap<>(request.getRegionActionCount()); 2765 2766 for (RegionAction regionAction : request.getRegionActionList()) { 2767 OperationQuota quota; 2768 HRegion region; 2769 RegionSpecifier regionSpecifier = regionAction.getRegion(); 2770 regionActionResultBuilder.clear(); 2771 2772 try { 2773 region = getRegion(regionSpecifier); 2774 quota = getRpcQuotaManager().checkQuota(region, regionAction.getActionList()); 2775 } catch (IOException e) { 2776 failRegionAction(responseBuilder, regionActionResultBuilder, regionAction, cellScanner, e); 2777 continue; // For this region it's a failure. 2778 } 2779 2780 try { 2781 boolean rejectIfFromClient = shouldRejectRequestsFromClient(region); 2782 2783 if (regionAction.hasCondition()) { 2784 // We only allow replication in standby state and it will not set the atomic flag. 2785 if (rejectIfFromClient) { 2786 failRegionAction(responseBuilder, regionActionResultBuilder, regionAction, cellScanner, 2787 new DoNotRetryIOException( 2788 region.getRegionInfo().getRegionNameAsString() + " is in STANDBY state")); 2789 continue; 2790 } 2791 2792 try { 2793 ClientProtos.ResultOrException.Builder resultOrExceptionOrBuilder = 2794 ClientProtos.ResultOrException.newBuilder(); 2795 if (regionAction.getActionCount() == 1) { 2796 CheckAndMutateResult result = 2797 checkAndMutate(region, quota, regionAction.getAction(0).getMutation(), cellScanner, 2798 regionAction.getCondition(), nonceGroup, spaceQuotaEnforcement); 2799 regionActionResultBuilder.setProcessed(result.isSuccess()); 2800 resultOrExceptionOrBuilder.setIndex(0); 2801 if (result.getResult() != null) { 2802 resultOrExceptionOrBuilder.setResult(ProtobufUtil.toResult(result.getResult())); 2803 } 2804 regionActionResultBuilder.addResultOrException(resultOrExceptionOrBuilder.build()); 2805 } else { 2806 CheckAndMutateResult result = checkAndMutate(region, regionAction.getActionList(), 2807 cellScanner, regionAction.getCondition(), nonceGroup, spaceQuotaEnforcement); 2808 regionActionResultBuilder.setProcessed(result.isSuccess()); 2809 for (int i = 0; i < regionAction.getActionCount(); i++) { 2810 if (i == 0 && result.getResult() != null) { 2811 // Set the result of the Increment/Append operations to the first element of the 2812 // ResultOrException list 2813 resultOrExceptionOrBuilder.setIndex(i); 2814 regionActionResultBuilder.addResultOrException(resultOrExceptionOrBuilder 2815 .setResult(ProtobufUtil.toResult(result.getResult())).build()); 2816 continue; 2817 } 2818 // To unify the response format with doNonAtomicRegionMutation and read through 2819 // client's AsyncProcess we have to add an empty result instance per operation 2820 resultOrExceptionOrBuilder.clear(); 2821 resultOrExceptionOrBuilder.setIndex(i); 2822 regionActionResultBuilder.addResultOrException(resultOrExceptionOrBuilder.build()); 2823 } 2824 } 2825 } catch (IOException e) { 2826 rpcServer.getMetrics().exception(e); 2827 // As it's an atomic operation with a condition, we may expect it's a global failure. 2828 regionActionResultBuilder.setException(ResponseConverter.buildException(e)); 2829 } 2830 } else if (regionAction.hasAtomic() && regionAction.getAtomic()) { 2831 // We only allow replication in standby state and it will not set the atomic flag. 2832 if (rejectIfFromClient) { 2833 failRegionAction(responseBuilder, regionActionResultBuilder, regionAction, cellScanner, 2834 new DoNotRetryIOException( 2835 region.getRegionInfo().getRegionNameAsString() + " is in STANDBY state")); 2836 continue; 2837 } 2838 try { 2839 doAtomicBatchOp(regionActionResultBuilder, region, quota, regionAction.getActionList(), 2840 cellScanner, nonceGroup, spaceQuotaEnforcement); 2841 regionActionResultBuilder.setProcessed(true); 2842 // We no longer use MultiResponse#processed. Instead, we use 2843 // RegionActionResult#processed. This is for backward compatibility for old clients. 2844 responseBuilder.setProcessed(true); 2845 } catch (IOException e) { 2846 rpcServer.getMetrics().exception(e); 2847 // As it's atomic, we may expect it's a global failure. 2848 regionActionResultBuilder.setException(ResponseConverter.buildException(e)); 2849 } 2850 } else { 2851 if ( 2852 rejectIfFromClient && regionAction.getActionCount() > 0 2853 && !isReplicationRequest(regionAction.getAction(0)) 2854 ) { 2855 // fail if it is not a replication request 2856 failRegionAction(responseBuilder, regionActionResultBuilder, regionAction, cellScanner, 2857 new DoNotRetryIOException( 2858 region.getRegionInfo().getRegionNameAsString() + " is in STANDBY state")); 2859 continue; 2860 } 2861 // doNonAtomicRegionMutation manages the exception internally 2862 if (context != null && closeCallBack == null) { 2863 // An RpcCallBack that creates a list of scanners that needs to perform callBack 2864 // operation on completion of multiGets. 2865 // Set this only once 2866 closeCallBack = new RegionScannersCloseCallBack(); 2867 context.setCallBack(closeCallBack); 2868 } 2869 cellsToReturn = doNonAtomicRegionMutation(region, quota, regionAction, cellScanner, 2870 regionActionResultBuilder, cellsToReturn, nonceGroup, closeCallBack, context, 2871 spaceQuotaEnforcement); 2872 } 2873 } finally { 2874 quota.close(); 2875 } 2876 2877 responseBuilder.addRegionActionResult(regionActionResultBuilder.build()); 2878 ClientProtos.RegionLoadStats regionLoadStats = region.getLoadStatistics(); 2879 if (regionLoadStats != null) { 2880 regionStats.put(regionSpecifier, regionLoadStats); 2881 } 2882 } 2883 // Load the controller with the Cells to return. 2884 if (cellsToReturn != null && !cellsToReturn.isEmpty() && controller != null) { 2885 controller.setCellScanner(CellUtil.createCellScanner(cellsToReturn)); 2886 } 2887 2888 MultiRegionLoadStats.Builder builder = MultiRegionLoadStats.newBuilder(); 2889 for (Entry<RegionSpecifier, ClientProtos.RegionLoadStats> stat : regionStats.entrySet()) { 2890 builder.addRegion(stat.getKey()); 2891 builder.addStat(stat.getValue()); 2892 } 2893 responseBuilder.setRegionStatistics(builder); 2894 return responseBuilder.build(); 2895 } 2896 2897 private void skipCellsForMutations(List<Action> actions, CellScanner cellScanner) { 2898 if (cellScanner == null) { 2899 return; 2900 } 2901 for (Action action : actions) { 2902 skipCellsForMutation(action, cellScanner); 2903 } 2904 } 2905 2906 private void skipCellsForMutation(Action action, CellScanner cellScanner) { 2907 if (cellScanner == null) { 2908 return; 2909 } 2910 try { 2911 if (action.hasMutation()) { 2912 MutationProto m = action.getMutation(); 2913 if (m.hasAssociatedCellCount()) { 2914 for (int i = 0; i < m.getAssociatedCellCount(); i++) { 2915 cellScanner.advance(); 2916 } 2917 } 2918 } 2919 } catch (IOException e) { 2920 // No need to handle these Individual Muatation level issue. Any way this entire RegionAction 2921 // marked as failed as we could not see the Region here. At client side the top level 2922 // RegionAction exception will be considered first. 2923 LOG.error("Error while skipping Cells in CellScanner for invalid Region Mutations", e); 2924 } 2925 } 2926 2927 /** 2928 * Mutate data in a table. 2929 * @param rpcc the RPC controller 2930 * @param request the mutate request 2931 */ 2932 @Override 2933 public MutateResponse mutate(final RpcController rpcc, final MutateRequest request) 2934 throws ServiceException { 2935 // rpc controller is how we bring in data via the back door; it is unprotobuf'ed data. 2936 // It is also the conduit via which we pass back data. 2937 HBaseRpcController controller = (HBaseRpcController) rpcc; 2938 CellScanner cellScanner = controller != null ? controller.cellScanner() : null; 2939 OperationQuota quota = null; 2940 RpcCallContext context = RpcServer.getCurrentCall().orElse(null); 2941 // Clear scanner so we are not holding on to reference across call. 2942 if (controller != null) { 2943 controller.setCellScanner(null); 2944 } 2945 try { 2946 checkOpen(); 2947 requestCount.increment(); 2948 rpcMutateRequestCount.increment(); 2949 HRegion region = getRegion(request.getRegion()); 2950 rejectIfInStandByState(region); 2951 MutateResponse.Builder builder = MutateResponse.newBuilder(); 2952 MutationProto mutation = request.getMutation(); 2953 if (!region.getRegionInfo().isMetaRegion()) { 2954 server.getMemStoreFlusher().reclaimMemStoreMemory(); 2955 } 2956 long nonceGroup = request.hasNonceGroup() ? request.getNonceGroup() : HConstants.NO_NONCE; 2957 quota = getRpcQuotaManager().checkQuota(region, OperationQuota.OperationType.MUTATE); 2958 ActivePolicyEnforcement spaceQuotaEnforcement = 2959 getSpaceQuotaManager().getActiveEnforcements(); 2960 2961 if (request.hasCondition()) { 2962 CheckAndMutateResult result = checkAndMutate(region, quota, mutation, cellScanner, 2963 request.getCondition(), nonceGroup, spaceQuotaEnforcement); 2964 builder.setProcessed(result.isSuccess()); 2965 boolean clientCellBlockSupported = isClientCellBlockSupport(context); 2966 addResult(builder, result.getResult(), controller, clientCellBlockSupported); 2967 if (clientCellBlockSupported) { 2968 addSize(context, result.getResult(), null); 2969 } 2970 } else { 2971 Result r = null; 2972 Boolean processed = null; 2973 MutationType type = mutation.getMutateType(); 2974 switch (type) { 2975 case APPEND: 2976 // TODO: this doesn't actually check anything. 2977 r = append(region, quota, mutation, cellScanner, nonceGroup, spaceQuotaEnforcement); 2978 break; 2979 case INCREMENT: 2980 // TODO: this doesn't actually check anything. 2981 r = increment(region, quota, mutation, cellScanner, nonceGroup, spaceQuotaEnforcement); 2982 break; 2983 case PUT: 2984 put(region, quota, mutation, cellScanner, spaceQuotaEnforcement); 2985 processed = Boolean.TRUE; 2986 break; 2987 case DELETE: 2988 delete(region, quota, mutation, cellScanner, spaceQuotaEnforcement); 2989 processed = Boolean.TRUE; 2990 break; 2991 default: 2992 throw new DoNotRetryIOException("Unsupported mutate type: " + type.name()); 2993 } 2994 if (processed != null) { 2995 builder.setProcessed(processed); 2996 } 2997 boolean clientCellBlockSupported = isClientCellBlockSupport(context); 2998 addResult(builder, r, controller, clientCellBlockSupported); 2999 if (clientCellBlockSupported) { 3000 addSize(context, r, null); 3001 } 3002 } 3003 return builder.build(); 3004 } catch (IOException ie) { 3005 server.checkFileSystem(); 3006 throw new ServiceException(ie); 3007 } finally { 3008 if (quota != null) { 3009 quota.close(); 3010 } 3011 } 3012 } 3013 3014 private void put(HRegion region, OperationQuota quota, MutationProto mutation, 3015 CellScanner cellScanner, ActivePolicyEnforcement spaceQuota) throws IOException { 3016 long before = EnvironmentEdgeManager.currentTime(); 3017 Put put = ProtobufUtil.toPut(mutation, cellScanner); 3018 checkCellSizeLimit(region, put); 3019 spaceQuota.getPolicyEnforcement(region).check(put); 3020 quota.addMutation(put); 3021 region.put(put); 3022 3023 MetricsRegionServer metricsRegionServer = server.getMetrics(); 3024 if (metricsRegionServer != null) { 3025 long after = EnvironmentEdgeManager.currentTime(); 3026 metricsRegionServer.updatePut(region.getRegionInfo().getTable(), after - before); 3027 } 3028 } 3029 3030 private void delete(HRegion region, OperationQuota quota, MutationProto mutation, 3031 CellScanner cellScanner, ActivePolicyEnforcement spaceQuota) throws IOException { 3032 long before = EnvironmentEdgeManager.currentTime(); 3033 Delete delete = ProtobufUtil.toDelete(mutation, cellScanner); 3034 checkCellSizeLimit(region, delete); 3035 spaceQuota.getPolicyEnforcement(region).check(delete); 3036 quota.addMutation(delete); 3037 region.delete(delete); 3038 3039 MetricsRegionServer metricsRegionServer = server.getMetrics(); 3040 if (metricsRegionServer != null) { 3041 long after = EnvironmentEdgeManager.currentTime(); 3042 metricsRegionServer.updateDelete(region.getRegionInfo().getTable(), after - before); 3043 } 3044 } 3045 3046 private CheckAndMutateResult checkAndMutate(HRegion region, OperationQuota quota, 3047 MutationProto mutation, CellScanner cellScanner, Condition condition, long nonceGroup, 3048 ActivePolicyEnforcement spaceQuota) throws IOException { 3049 long before = EnvironmentEdgeManager.currentTime(); 3050 CheckAndMutate checkAndMutate = ProtobufUtil.toCheckAndMutate(condition, mutation, cellScanner); 3051 long nonce = mutation.hasNonce() ? mutation.getNonce() : HConstants.NO_NONCE; 3052 checkCellSizeLimit(region, (Mutation) checkAndMutate.getAction()); 3053 spaceQuota.getPolicyEnforcement(region).check((Mutation) checkAndMutate.getAction()); 3054 quota.addMutation((Mutation) checkAndMutate.getAction()); 3055 3056 CheckAndMutateResult result = null; 3057 if (region.getCoprocessorHost() != null) { 3058 result = region.getCoprocessorHost().preCheckAndMutate(checkAndMutate); 3059 } 3060 if (result == null) { 3061 result = region.checkAndMutate(checkAndMutate, nonceGroup, nonce); 3062 if (region.getCoprocessorHost() != null) { 3063 result = region.getCoprocessorHost().postCheckAndMutate(checkAndMutate, result); 3064 } 3065 } 3066 MetricsRegionServer metricsRegionServer = server.getMetrics(); 3067 if (metricsRegionServer != null) { 3068 long after = EnvironmentEdgeManager.currentTime(); 3069 metricsRegionServer.updateCheckAndMutate(region.getRegionInfo().getTable(), after - before); 3070 3071 MutationType type = mutation.getMutateType(); 3072 switch (type) { 3073 case PUT: 3074 metricsRegionServer.updateCheckAndPut(region.getRegionInfo().getTable(), after - before); 3075 break; 3076 case DELETE: 3077 metricsRegionServer.updateCheckAndDelete(region.getRegionInfo().getTable(), 3078 after - before); 3079 break; 3080 default: 3081 break; 3082 } 3083 } 3084 return result; 3085 } 3086 3087 // This is used to keep compatible with the old client implementation. Consider remove it if we 3088 // decide to drop the support of the client that still sends close request to a region scanner 3089 // which has already been exhausted. 3090 @Deprecated 3091 private static final IOException SCANNER_ALREADY_CLOSED = new IOException() { 3092 3093 private static final long serialVersionUID = -4305297078988180130L; 3094 3095 @Override 3096 public synchronized Throwable fillInStackTrace() { 3097 return this; 3098 } 3099 }; 3100 3101 private RegionScannerHolder getRegionScanner(ScanRequest request) throws IOException { 3102 String scannerName = toScannerName(request.getScannerId()); 3103 RegionScannerHolder rsh = this.scanners.get(scannerName); 3104 if (rsh == null) { 3105 // just ignore the next or close request if scanner does not exists. 3106 if (closedScanners.getIfPresent(scannerName) != null) { 3107 throw SCANNER_ALREADY_CLOSED; 3108 } else { 3109 LOG.warn("Client tried to access missing scanner " + scannerName); 3110 throw new UnknownScannerException( 3111 "Unknown scanner '" + scannerName + "'. This can happen due to any of the following " 3112 + "reasons: a) Scanner id given is wrong, b) Scanner lease expired because of " 3113 + "long wait between consecutive client checkins, c) Server may be closing down, " 3114 + "d) RegionServer restart during upgrade.\nIf the issue is due to reason (b), a " 3115 + "possible fix would be increasing the value of" 3116 + "'hbase.client.scanner.timeout.period' configuration."); 3117 } 3118 } 3119 rejectIfInStandByState(rsh.r); 3120 RegionInfo hri = rsh.s.getRegionInfo(); 3121 // Yes, should be the same instance 3122 if (server.getOnlineRegion(hri.getRegionName()) != rsh.r) { 3123 String msg = "Region has changed on the scanner " + scannerName + ": regionName=" 3124 + hri.getRegionNameAsString() + ", scannerRegionName=" + rsh.r; 3125 LOG.warn(msg + ", closing..."); 3126 scanners.remove(scannerName); 3127 try { 3128 rsh.s.close(); 3129 } catch (IOException e) { 3130 LOG.warn("Getting exception closing " + scannerName, e); 3131 } finally { 3132 try { 3133 server.getLeaseManager().cancelLease(scannerName); 3134 } catch (LeaseException e) { 3135 LOG.warn("Getting exception closing " + scannerName, e); 3136 } 3137 } 3138 throw new NotServingRegionException(msg); 3139 } 3140 return rsh; 3141 } 3142 3143 /** 3144 * @return Pair with scannerName key to use with this new Scanner and its RegionScannerHolder 3145 * value. 3146 */ 3147 private Pair<String, RegionScannerHolder> newRegionScanner(ScanRequest request, 3148 ScanResponse.Builder builder) throws IOException { 3149 HRegion region = getRegion(request.getRegion()); 3150 rejectIfInStandByState(region); 3151 ClientProtos.Scan protoScan = request.getScan(); 3152 boolean isLoadingCfsOnDemandSet = protoScan.hasLoadColumnFamiliesOnDemand(); 3153 Scan scan = ProtobufUtil.toScan(protoScan); 3154 // if the request doesn't set this, get the default region setting. 3155 if (!isLoadingCfsOnDemandSet) { 3156 scan.setLoadColumnFamiliesOnDemand(region.isLoadingCfsOnDemandDefault()); 3157 } 3158 3159 if (!scan.hasFamilies()) { 3160 // Adding all families to scanner 3161 for (byte[] family : region.getTableDescriptor().getColumnFamilyNames()) { 3162 scan.addFamily(family); 3163 } 3164 } 3165 if (region.getCoprocessorHost() != null) { 3166 // preScannerOpen is not allowed to return a RegionScanner. Only post hook can create a 3167 // wrapper for the core created RegionScanner 3168 region.getCoprocessorHost().preScannerOpen(scan); 3169 } 3170 RegionScannerImpl coreScanner = region.getScanner(scan); 3171 Shipper shipper = coreScanner; 3172 RegionScanner scanner = coreScanner; 3173 try { 3174 if (region.getCoprocessorHost() != null) { 3175 scanner = region.getCoprocessorHost().postScannerOpen(scan, scanner); 3176 } 3177 } catch (Exception e) { 3178 // Although region coprocessor is for advanced users and they should take care of the 3179 // implementation to not damage the HBase system, closing the scanner on exception here does 3180 // not have any bad side effect, so let's do it 3181 scanner.close(); 3182 throw e; 3183 } 3184 long scannerId = scannerIdGenerator.generateNewScannerId(); 3185 builder.setScannerId(scannerId); 3186 builder.setMvccReadPoint(scanner.getMvccReadPoint()); 3187 builder.setTtl(scannerLeaseTimeoutPeriod); 3188 String scannerName = toScannerName(scannerId); 3189 3190 boolean fullRegionScan = 3191 !region.getRegionInfo().getTable().isSystemTable() && isFullRegionScan(scan, region); 3192 3193 return new Pair<String, RegionScannerHolder>(scannerName, 3194 addScanner(scannerName, scanner, shipper, region, scan.isNeedCursorResult(), fullRegionScan)); 3195 } 3196 3197 /** 3198 * The returned String is used as key doing look up of outstanding Scanners in this Servers' 3199 * this.scanners, the Map of outstanding scanners and their current state. 3200 * @param scannerId A scanner long id. 3201 * @return The long id as a String. 3202 */ 3203 private static String toScannerName(long scannerId) { 3204 return Long.toString(scannerId); 3205 } 3206 3207 private void checkScanNextCallSeq(ScanRequest request, RegionScannerHolder rsh) 3208 throws OutOfOrderScannerNextException { 3209 // if nextCallSeq does not match throw Exception straight away. This needs to be 3210 // performed even before checking of Lease. 3211 // See HBASE-5974 3212 if (request.hasNextCallSeq()) { 3213 long callSeq = request.getNextCallSeq(); 3214 if (!rsh.incNextCallSeq(callSeq)) { 3215 throw new OutOfOrderScannerNextException( 3216 "Expected nextCallSeq: " + rsh.getNextCallSeq() + " But the nextCallSeq got from client: " 3217 + request.getNextCallSeq() + "; request=" + TextFormat.shortDebugString(request)); 3218 } 3219 } 3220 } 3221 3222 private void addScannerLeaseBack(LeaseManager.Lease lease) { 3223 try { 3224 server.getLeaseManager().addLease(lease); 3225 } catch (LeaseStillHeldException e) { 3226 // should not happen as the scanner id is unique. 3227 throw new AssertionError(e); 3228 } 3229 } 3230 3231 private long getTimeLimit(HBaseRpcController controller, boolean allowHeartbeatMessages) { 3232 // Set the time limit to be half of the more restrictive timeout value (one of the 3233 // timeout values must be positive). In the event that both values are positive, the 3234 // more restrictive of the two is used to calculate the limit. 3235 if (allowHeartbeatMessages && (scannerLeaseTimeoutPeriod > 0 || rpcTimeout > 0)) { 3236 long timeLimitDelta; 3237 if (scannerLeaseTimeoutPeriod > 0 && rpcTimeout > 0) { 3238 timeLimitDelta = Math.min(scannerLeaseTimeoutPeriod, rpcTimeout); 3239 } else { 3240 timeLimitDelta = scannerLeaseTimeoutPeriod > 0 ? scannerLeaseTimeoutPeriod : rpcTimeout; 3241 } 3242 if (controller != null && controller.getCallTimeout() > 0) { 3243 timeLimitDelta = Math.min(timeLimitDelta, controller.getCallTimeout()); 3244 } 3245 // Use half of whichever timeout value was more restrictive... But don't allow 3246 // the time limit to be less than the allowable minimum (could cause an 3247 // immediatate timeout before scanning any data). 3248 timeLimitDelta = Math.max(timeLimitDelta / 2, minimumScanTimeLimitDelta); 3249 // XXX: Can not use EnvironmentEdge here because TestIncrementTimeRange use a 3250 // ManualEnvironmentEdge. Consider using System.nanoTime instead. 3251 return EnvironmentEdgeManager.currentTime() + timeLimitDelta; 3252 } 3253 // Default value of timeLimit is negative to indicate no timeLimit should be 3254 // enforced. 3255 return -1L; 3256 } 3257 3258 private void checkLimitOfRows(int numOfCompleteRows, int limitOfRows, boolean moreRows, 3259 ScannerContext scannerContext, ScanResponse.Builder builder) { 3260 if (numOfCompleteRows >= limitOfRows) { 3261 if (LOG.isTraceEnabled()) { 3262 LOG.trace("Done scanning, limit of rows reached, moreRows: " + moreRows 3263 + " scannerContext: " + scannerContext); 3264 } 3265 builder.setMoreResults(false); 3266 } 3267 } 3268 3269 // return whether we have more results in region. 3270 private void scan(HBaseRpcController controller, ScanRequest request, RegionScannerHolder rsh, 3271 long maxQuotaResultSize, int maxResults, int limitOfRows, List<Result> results, 3272 ScanResponse.Builder builder, MutableObject<Object> lastBlock, RpcCallContext context) 3273 throws IOException { 3274 HRegion region = rsh.r; 3275 RegionScanner scanner = rsh.s; 3276 long maxResultSize; 3277 if (scanner.getMaxResultSize() > 0) { 3278 maxResultSize = Math.min(scanner.getMaxResultSize(), maxQuotaResultSize); 3279 } else { 3280 maxResultSize = maxQuotaResultSize; 3281 } 3282 // This is cells inside a row. Default size is 10 so if many versions or many cfs, 3283 // then we'll resize. Resizings show in profiler. Set it higher than 10. For now 3284 // arbitrary 32. TODO: keep record of general size of results being returned. 3285 ArrayList<Cell> values = new ArrayList<>(32); 3286 region.startRegionOperation(Operation.SCAN); 3287 long before = EnvironmentEdgeManager.currentTime(); 3288 // Used to check if we've matched the row limit set on the Scan 3289 int numOfCompleteRows = 0; 3290 // Count of times we call nextRaw; can be > numOfCompleteRows. 3291 int numOfNextRawCalls = 0; 3292 try { 3293 int numOfResults = 0; 3294 synchronized (scanner) { 3295 boolean stale = (region.getRegionInfo().getReplicaId() != 0); 3296 boolean clientHandlesPartials = 3297 request.hasClientHandlesPartials() && request.getClientHandlesPartials(); 3298 boolean clientHandlesHeartbeats = 3299 request.hasClientHandlesHeartbeats() && request.getClientHandlesHeartbeats(); 3300 3301 // On the server side we must ensure that the correct ordering of partial results is 3302 // returned to the client to allow them to properly reconstruct the partial results. 3303 // If the coprocessor host is adding to the result list, we cannot guarantee the 3304 // correct ordering of partial results and so we prevent partial results from being 3305 // formed. 3306 boolean serverGuaranteesOrderOfPartials = results.isEmpty(); 3307 boolean allowPartialResults = clientHandlesPartials && serverGuaranteesOrderOfPartials; 3308 boolean moreRows = false; 3309 3310 // Heartbeat messages occur when the processing of the ScanRequest is exceeds a 3311 // certain time threshold on the server. When the time threshold is exceeded, the 3312 // server stops the scan and sends back whatever Results it has accumulated within 3313 // that time period (may be empty). Since heartbeat messages have the potential to 3314 // create partial Results (in the event that the timeout occurs in the middle of a 3315 // row), we must only generate heartbeat messages when the client can handle both 3316 // heartbeats AND partials 3317 boolean allowHeartbeatMessages = clientHandlesHeartbeats && allowPartialResults; 3318 3319 long timeLimit = getTimeLimit(controller, allowHeartbeatMessages); 3320 3321 final LimitScope sizeScope = 3322 allowPartialResults ? LimitScope.BETWEEN_CELLS : LimitScope.BETWEEN_ROWS; 3323 final LimitScope timeScope = 3324 allowHeartbeatMessages ? LimitScope.BETWEEN_CELLS : LimitScope.BETWEEN_ROWS; 3325 3326 boolean trackMetrics = request.hasTrackScanMetrics() && request.getTrackScanMetrics(); 3327 3328 // Configure with limits for this RPC. Set keep progress true since size progress 3329 // towards size limit should be kept between calls to nextRaw 3330 ScannerContext.Builder contextBuilder = ScannerContext.newBuilder(true); 3331 // maxResultSize - either we can reach this much size for all cells(being read) data or sum 3332 // of heap size occupied by cells(being read). Cell data means its key and value parts. 3333 contextBuilder.setSizeLimit(sizeScope, maxResultSize, maxResultSize); 3334 contextBuilder.setBatchLimit(scanner.getBatch()); 3335 contextBuilder.setTimeLimit(timeScope, timeLimit); 3336 contextBuilder.setTrackMetrics(trackMetrics); 3337 ScannerContext scannerContext = contextBuilder.build(); 3338 boolean limitReached = false; 3339 while (numOfResults < maxResults) { 3340 // Reset the batch progress to 0 before every call to RegionScanner#nextRaw. The 3341 // batch limit is a limit on the number of cells per Result. Thus, if progress is 3342 // being tracked (i.e. scannerContext.keepProgress() is true) then we need to 3343 // reset the batch progress between nextRaw invocations since we don't want the 3344 // batch progress from previous calls to affect future calls 3345 scannerContext.setBatchProgress(0); 3346 assert values.isEmpty(); 3347 3348 // Collect values to be returned here 3349 moreRows = scanner.nextRaw(values, scannerContext); 3350 if (context == null) { 3351 // When there is no RpcCallContext,copy EC to heap, then the scanner would close, 3352 // This can be an EXPENSIVE call. It may make an extra copy from offheap to onheap 3353 // buffers.See more details in HBASE-26036. 3354 CellUtil.cloneIfNecessary(values); 3355 } 3356 numOfNextRawCalls++; 3357 3358 if (!values.isEmpty()) { 3359 if (limitOfRows > 0) { 3360 // First we need to check if the last result is partial and we have a row change. If 3361 // so then we need to increase the numOfCompleteRows. 3362 if (results.isEmpty()) { 3363 if ( 3364 rsh.rowOfLastPartialResult != null 3365 && !CellUtil.matchingRows(values.get(0), rsh.rowOfLastPartialResult) 3366 ) { 3367 numOfCompleteRows++; 3368 checkLimitOfRows(numOfCompleteRows, limitOfRows, moreRows, scannerContext, 3369 builder); 3370 } 3371 } else { 3372 Result lastResult = results.get(results.size() - 1); 3373 if ( 3374 lastResult.mayHaveMoreCellsInRow() 3375 && !CellUtil.matchingRows(values.get(0), lastResult.getRow()) 3376 ) { 3377 numOfCompleteRows++; 3378 checkLimitOfRows(numOfCompleteRows, limitOfRows, moreRows, scannerContext, 3379 builder); 3380 } 3381 } 3382 if (builder.hasMoreResults() && !builder.getMoreResults()) { 3383 break; 3384 } 3385 } 3386 boolean mayHaveMoreCellsInRow = scannerContext.mayHaveMoreCellsInRow(); 3387 Result r = Result.create(values, null, stale, mayHaveMoreCellsInRow); 3388 lastBlock.setValue(addSize(context, r, lastBlock.getValue())); 3389 results.add(r); 3390 numOfResults++; 3391 if (!mayHaveMoreCellsInRow && limitOfRows > 0) { 3392 numOfCompleteRows++; 3393 checkLimitOfRows(numOfCompleteRows, limitOfRows, moreRows, scannerContext, builder); 3394 if (builder.hasMoreResults() && !builder.getMoreResults()) { 3395 break; 3396 } 3397 } 3398 } else if (!moreRows && !results.isEmpty()) { 3399 // No more cells for the scan here, we need to ensure that the mayHaveMoreCellsInRow of 3400 // last result is false. Otherwise it's possible that: the first nextRaw returned 3401 // because BATCH_LIMIT_REACHED (BTW it happen to exhaust all cells of the scan),so the 3402 // last result's mayHaveMoreCellsInRow will be true. while the following nextRaw will 3403 // return with moreRows=false, which means moreResultsInRegion would be false, it will 3404 // be a contradictory state (HBASE-21206). 3405 int lastIdx = results.size() - 1; 3406 Result r = results.get(lastIdx); 3407 if (r.mayHaveMoreCellsInRow()) { 3408 results.set(lastIdx, Result.create(r.rawCells(), r.getExists(), r.isStale(), false)); 3409 } 3410 } 3411 boolean sizeLimitReached = scannerContext.checkSizeLimit(LimitScope.BETWEEN_ROWS); 3412 boolean timeLimitReached = scannerContext.checkTimeLimit(LimitScope.BETWEEN_ROWS); 3413 boolean resultsLimitReached = numOfResults >= maxResults; 3414 limitReached = sizeLimitReached || timeLimitReached || resultsLimitReached; 3415 3416 if (limitReached || !moreRows) { 3417 // We only want to mark a ScanResponse as a heartbeat message in the event that 3418 // there are more values to be read server side. If there aren't more values, 3419 // marking it as a heartbeat is wasteful because the client will need to issue 3420 // another ScanRequest only to realize that they already have all the values 3421 if (moreRows && timeLimitReached) { 3422 // Heartbeat messages occur when the time limit has been reached. 3423 builder.setHeartbeatMessage(true); 3424 if (rsh.needCursor) { 3425 Cell cursorCell = scannerContext.getLastPeekedCell(); 3426 if (cursorCell != null) { 3427 builder.setCursor(ProtobufUtil.toCursor(cursorCell)); 3428 } 3429 } 3430 } 3431 break; 3432 } 3433 values.clear(); 3434 } 3435 builder.setMoreResultsInRegion(moreRows); 3436 // Check to see if the client requested that we track metrics server side. If the 3437 // client requested metrics, retrieve the metrics from the scanner context. 3438 if (trackMetrics) { 3439 Map<String, Long> metrics = scannerContext.getMetrics().getMetricsMap(); 3440 ScanMetrics.Builder metricBuilder = ScanMetrics.newBuilder(); 3441 NameInt64Pair.Builder pairBuilder = NameInt64Pair.newBuilder(); 3442 3443 for (Entry<String, Long> entry : metrics.entrySet()) { 3444 pairBuilder.setName(entry.getKey()); 3445 pairBuilder.setValue(entry.getValue()); 3446 metricBuilder.addMetrics(pairBuilder.build()); 3447 } 3448 3449 builder.setScanMetrics(metricBuilder.build()); 3450 } 3451 } 3452 } finally { 3453 region.closeRegionOperation(); 3454 // Update serverside metrics, even on error. 3455 long end = EnvironmentEdgeManager.currentTime(); 3456 long responseCellSize = context != null ? context.getResponseCellSize() : 0; 3457 region.getMetrics().updateScanTime(end - before); 3458 final MetricsRegionServer metricsRegionServer = server.getMetrics(); 3459 if (metricsRegionServer != null) { 3460 metricsRegionServer.updateScanSize(region.getTableDescriptor().getTableName(), 3461 responseCellSize); 3462 metricsRegionServer.updateScanTime(region.getTableDescriptor().getTableName(), 3463 end - before); 3464 metricsRegionServer.updateReadQueryMeter(region.getRegionInfo().getTable(), 3465 numOfNextRawCalls); 3466 } 3467 } 3468 // coprocessor postNext hook 3469 if (region.getCoprocessorHost() != null) { 3470 region.getCoprocessorHost().postScannerNext(scanner, results, maxResults, true); 3471 } 3472 } 3473 3474 /** 3475 * Scan data in a table. 3476 * @param controller the RPC controller 3477 * @param request the scan request n 3478 */ 3479 @Override 3480 public ScanResponse scan(final RpcController controller, final ScanRequest request) 3481 throws ServiceException { 3482 if (controller != null && !(controller instanceof HBaseRpcController)) { 3483 throw new UnsupportedOperationException( 3484 "We only do " + "HBaseRpcControllers! FIX IF A PROBLEM: " + controller); 3485 } 3486 if (!request.hasScannerId() && !request.hasScan()) { 3487 throw new ServiceException( 3488 new DoNotRetryIOException("Missing required input: scannerId or scan")); 3489 } 3490 try { 3491 checkOpen(); 3492 } catch (IOException e) { 3493 if (request.hasScannerId()) { 3494 String scannerName = toScannerName(request.getScannerId()); 3495 if (LOG.isDebugEnabled()) { 3496 LOG.debug( 3497 "Server shutting down and client tried to access missing scanner " + scannerName); 3498 } 3499 final LeaseManager leaseManager = server.getLeaseManager(); 3500 if (leaseManager != null) { 3501 try { 3502 leaseManager.cancelLease(scannerName); 3503 } catch (LeaseException le) { 3504 // No problem, ignore 3505 if (LOG.isTraceEnabled()) { 3506 LOG.trace("Un-able to cancel lease of scanner. It could already be closed."); 3507 } 3508 } 3509 } 3510 } 3511 throw new ServiceException(e); 3512 } 3513 requestCount.increment(); 3514 rpcScanRequestCount.increment(); 3515 RegionScannerHolder rsh; 3516 ScanResponse.Builder builder = ScanResponse.newBuilder(); 3517 String scannerName; 3518 try { 3519 if (request.hasScannerId()) { 3520 // The downstream projects such as AsyncHBase in OpenTSDB need this value. See HBASE-18000 3521 // for more details. 3522 long scannerId = request.getScannerId(); 3523 builder.setScannerId(scannerId); 3524 scannerName = toScannerName(scannerId); 3525 rsh = getRegionScanner(request); 3526 } else { 3527 Pair<String, RegionScannerHolder> scannerNameAndRSH = newRegionScanner(request, builder); 3528 scannerName = scannerNameAndRSH.getFirst(); 3529 rsh = scannerNameAndRSH.getSecond(); 3530 } 3531 } catch (IOException e) { 3532 if (e == SCANNER_ALREADY_CLOSED) { 3533 // Now we will close scanner automatically if there are no more results for this region but 3534 // the old client will still send a close request to us. Just ignore it and return. 3535 return builder.build(); 3536 } 3537 throw new ServiceException(e); 3538 } 3539 if (rsh.fullRegionScan) { 3540 rpcFullScanRequestCount.increment(); 3541 } 3542 HRegion region = rsh.r; 3543 LeaseManager.Lease lease; 3544 try { 3545 // Remove lease while its being processed in server; protects against case 3546 // where processing of request takes > lease expiration time. or null if none found. 3547 lease = server.getLeaseManager().removeLease(scannerName); 3548 } catch (LeaseException e) { 3549 throw new ServiceException(e); 3550 } 3551 if (request.hasRenew() && request.getRenew()) { 3552 // add back and return 3553 addScannerLeaseBack(lease); 3554 try { 3555 checkScanNextCallSeq(request, rsh); 3556 } catch (OutOfOrderScannerNextException e) { 3557 throw new ServiceException(e); 3558 } 3559 return builder.build(); 3560 } 3561 OperationQuota quota; 3562 try { 3563 quota = getRpcQuotaManager().checkQuota(region, OperationQuota.OperationType.SCAN); 3564 } catch (IOException e) { 3565 addScannerLeaseBack(lease); 3566 throw new ServiceException(e); 3567 } 3568 try { 3569 checkScanNextCallSeq(request, rsh); 3570 } catch (OutOfOrderScannerNextException e) { 3571 addScannerLeaseBack(lease); 3572 throw new ServiceException(e); 3573 } 3574 // Now we have increased the next call sequence. If we give client an error, the retry will 3575 // never success. So we'd better close the scanner and return a DoNotRetryIOException to client 3576 // and then client will try to open a new scanner. 3577 boolean closeScanner = request.hasCloseScanner() ? request.getCloseScanner() : false; 3578 int rows; // this is scan.getCaching 3579 if (request.hasNumberOfRows()) { 3580 rows = request.getNumberOfRows(); 3581 } else { 3582 rows = closeScanner ? 0 : 1; 3583 } 3584 RpcCallContext context = RpcServer.getCurrentCall().orElse(null); 3585 // now let's do the real scan. 3586 long maxQuotaResultSize = Math.min(maxScannerResultSize, quota.getReadAvailable()); 3587 RegionScanner scanner = rsh.s; 3588 // this is the limit of rows for this scan, if we the number of rows reach this value, we will 3589 // close the scanner. 3590 int limitOfRows; 3591 if (request.hasLimitOfRows()) { 3592 limitOfRows = request.getLimitOfRows(); 3593 } else { 3594 limitOfRows = -1; 3595 } 3596 MutableObject<Object> lastBlock = new MutableObject<>(); 3597 boolean scannerClosed = false; 3598 try { 3599 List<Result> results = new ArrayList<>(Math.min(rows, 512)); 3600 if (rows > 0) { 3601 boolean done = false; 3602 // Call coprocessor. Get region info from scanner. 3603 if (region.getCoprocessorHost() != null) { 3604 Boolean bypass = region.getCoprocessorHost().preScannerNext(scanner, results, rows); 3605 if (!results.isEmpty()) { 3606 for (Result r : results) { 3607 lastBlock.setValue(addSize(context, r, lastBlock.getValue())); 3608 } 3609 } 3610 if (bypass != null && bypass.booleanValue()) { 3611 done = true; 3612 } 3613 } 3614 if (!done) { 3615 scan((HBaseRpcController) controller, request, rsh, maxQuotaResultSize, rows, limitOfRows, 3616 results, builder, lastBlock, context); 3617 } else { 3618 builder.setMoreResultsInRegion(!results.isEmpty()); 3619 } 3620 } else { 3621 // This is a open scanner call with numberOfRow = 0, so set more results in region to true. 3622 builder.setMoreResultsInRegion(true); 3623 } 3624 3625 quota.addScanResult(results); 3626 addResults(builder, results, (HBaseRpcController) controller, 3627 RegionReplicaUtil.isDefaultReplica(region.getRegionInfo()), 3628 isClientCellBlockSupport(context)); 3629 if (scanner.isFilterDone() && results.isEmpty()) { 3630 // If the scanner's filter - if any - is done with the scan 3631 // only set moreResults to false if the results is empty. This is used to keep compatible 3632 // with the old scan implementation where we just ignore the returned results if moreResults 3633 // is false. Can remove the isEmpty check after we get rid of the old implementation. 3634 builder.setMoreResults(false); 3635 } 3636 // Later we may close the scanner depending on this flag so here we need to make sure that we 3637 // have already set this flag. 3638 assert builder.hasMoreResultsInRegion(); 3639 // we only set moreResults to false in the above code, so set it to true if we haven't set it 3640 // yet. 3641 if (!builder.hasMoreResults()) { 3642 builder.setMoreResults(true); 3643 } 3644 if (builder.getMoreResults() && builder.getMoreResultsInRegion() && !results.isEmpty()) { 3645 // Record the last cell of the last result if it is a partial result 3646 // We need this to calculate the complete rows we have returned to client as the 3647 // mayHaveMoreCellsInRow is true does not mean that there will be extra cells for the 3648 // current row. We may filter out all the remaining cells for the current row and just 3649 // return the cells of the nextRow when calling RegionScanner.nextRaw. So here we need to 3650 // check for row change. 3651 Result lastResult = results.get(results.size() - 1); 3652 if (lastResult.mayHaveMoreCellsInRow()) { 3653 rsh.rowOfLastPartialResult = lastResult.getRow(); 3654 } else { 3655 rsh.rowOfLastPartialResult = null; 3656 } 3657 } 3658 if (!builder.getMoreResults() || !builder.getMoreResultsInRegion() || closeScanner) { 3659 scannerClosed = true; 3660 closeScanner(region, scanner, scannerName, context); 3661 } 3662 return builder.build(); 3663 } catch (IOException e) { 3664 try { 3665 // scanner is closed here 3666 scannerClosed = true; 3667 // The scanner state might be left in a dirty state, so we will tell the Client to 3668 // fail this RPC and close the scanner while opening up another one from the start of 3669 // row that the client has last seen. 3670 closeScanner(region, scanner, scannerName, context); 3671 3672 // If it is a DoNotRetryIOException already, throw as it is. Unfortunately, DNRIOE is 3673 // used in two different semantics. 3674 // (1) The first is to close the client scanner and bubble up the exception all the way 3675 // to the application. This is preferred when the exception is really un-recoverable 3676 // (like CorruptHFileException, etc). Plain DoNotRetryIOException also falls into this 3677 // bucket usually. 3678 // (2) Second semantics is to close the current region scanner only, but continue the 3679 // client scanner by overriding the exception. This is usually UnknownScannerException, 3680 // OutOfOrderScannerNextException, etc where the region scanner has to be closed, but the 3681 // application-level ClientScanner has to continue without bubbling up the exception to 3682 // the client. See ClientScanner code to see how it deals with these special exceptions. 3683 if (e instanceof DoNotRetryIOException) { 3684 throw e; 3685 } 3686 3687 // If it is a FileNotFoundException, wrap as a 3688 // DoNotRetryIOException. This can avoid the retry in ClientScanner. 3689 if (e instanceof FileNotFoundException) { 3690 throw new DoNotRetryIOException(e); 3691 } 3692 3693 // We closed the scanner already. Instead of throwing the IOException, and client 3694 // retrying with the same scannerId only to get USE on the next RPC, we directly throw 3695 // a special exception to save an RPC. 3696 if (VersionInfoUtil.hasMinimumVersion(context.getClientVersionInfo(), 1, 4)) { 3697 // 1.4.0+ clients know how to handle 3698 throw new ScannerResetException("Scanner is closed on the server-side", e); 3699 } else { 3700 // older clients do not know about SRE. Just throw USE, which they will handle 3701 throw new UnknownScannerException("Throwing UnknownScannerException to reset the client" 3702 + " scanner state for clients older than 1.3.", e); 3703 } 3704 } catch (IOException ioe) { 3705 throw new ServiceException(ioe); 3706 } 3707 } finally { 3708 if (!scannerClosed) { 3709 // Adding resets expiration time on lease. 3710 // the closeCallBack will be set in closeScanner so here we only care about shippedCallback 3711 if (context != null) { 3712 context.setCallBack(rsh.shippedCallback); 3713 } else { 3714 // If context is null,here we call rsh.shippedCallback directly to reuse the logic in 3715 // rsh.shippedCallback to release the internal resources in rsh,and lease is also added 3716 // back to regionserver's LeaseManager in rsh.shippedCallback. 3717 runShippedCallback(rsh); 3718 } 3719 } 3720 quota.close(); 3721 } 3722 } 3723 3724 private void runShippedCallback(RegionScannerHolder rsh) throws ServiceException { 3725 assert rsh.shippedCallback != null; 3726 try { 3727 rsh.shippedCallback.run(); 3728 } catch (IOException ioe) { 3729 throw new ServiceException(ioe); 3730 } 3731 } 3732 3733 private void closeScanner(HRegion region, RegionScanner scanner, String scannerName, 3734 RpcCallContext context) throws IOException { 3735 if (region.getCoprocessorHost() != null) { 3736 if (region.getCoprocessorHost().preScannerClose(scanner)) { 3737 // bypass the actual close. 3738 return; 3739 } 3740 } 3741 RegionScannerHolder rsh = scanners.remove(scannerName); 3742 if (rsh != null) { 3743 if (context != null) { 3744 context.setCallBack(rsh.closeCallBack); 3745 } else { 3746 rsh.s.close(); 3747 } 3748 if (region.getCoprocessorHost() != null) { 3749 region.getCoprocessorHost().postScannerClose(scanner); 3750 } 3751 closedScanners.put(scannerName, scannerName); 3752 } 3753 } 3754 3755 @Override 3756 public CoprocessorServiceResponse execRegionServerService(RpcController controller, 3757 CoprocessorServiceRequest request) throws ServiceException { 3758 rpcPreCheck("execRegionServerService"); 3759 return server.execRegionServerService(controller, request); 3760 } 3761 3762 @Override 3763 public GetSpaceQuotaSnapshotsResponse getSpaceQuotaSnapshots(RpcController controller, 3764 GetSpaceQuotaSnapshotsRequest request) throws ServiceException { 3765 try { 3766 final RegionServerSpaceQuotaManager manager = server.getRegionServerSpaceQuotaManager(); 3767 final GetSpaceQuotaSnapshotsResponse.Builder builder = 3768 GetSpaceQuotaSnapshotsResponse.newBuilder(); 3769 if (manager != null) { 3770 final Map<TableName, SpaceQuotaSnapshot> snapshots = manager.copyQuotaSnapshots(); 3771 for (Entry<TableName, SpaceQuotaSnapshot> snapshot : snapshots.entrySet()) { 3772 builder.addSnapshots(TableQuotaSnapshot.newBuilder() 3773 .setTableName(ProtobufUtil.toProtoTableName(snapshot.getKey())) 3774 .setSnapshot(SpaceQuotaSnapshot.toProtoSnapshot(snapshot.getValue())).build()); 3775 } 3776 } 3777 return builder.build(); 3778 } catch (Exception e) { 3779 throw new ServiceException(e); 3780 } 3781 } 3782 3783 @Override 3784 public ClearRegionBlockCacheResponse clearRegionBlockCache(RpcController controller, 3785 ClearRegionBlockCacheRequest request) throws ServiceException { 3786 rpcPreCheck("clearRegionBlockCache"); 3787 ClearRegionBlockCacheResponse.Builder builder = ClearRegionBlockCacheResponse.newBuilder(); 3788 CacheEvictionStatsBuilder stats = CacheEvictionStats.builder(); 3789 List<HRegion> regions = getRegions(request.getRegionList(), stats); 3790 for (HRegion region : regions) { 3791 try { 3792 stats = stats.append(this.server.clearRegionBlockCache(region)); 3793 } catch (Exception e) { 3794 stats.addException(region.getRegionInfo().getRegionName(), e); 3795 } 3796 } 3797 stats.withMaxCacheSize(server.getBlockCache().map(BlockCache::getMaxSize).orElse(0L)); 3798 return builder.setStats(ProtobufUtil.toCacheEvictionStats(stats.build())).build(); 3799 } 3800 3801 private void executeOpenRegionProcedures(OpenRegionRequest request, 3802 Map<TableName, TableDescriptor> tdCache) { 3803 long masterSystemTime = request.hasMasterSystemTime() ? request.getMasterSystemTime() : -1; 3804 for (RegionOpenInfo regionOpenInfo : request.getOpenInfoList()) { 3805 RegionInfo regionInfo = ProtobufUtil.toRegionInfo(regionOpenInfo.getRegion()); 3806 TableName tableName = regionInfo.getTable(); 3807 TableDescriptor tableDesc = tdCache.get(tableName); 3808 if (tableDesc == null) { 3809 try { 3810 tableDesc = server.getTableDescriptors().get(regionInfo.getTable()); 3811 } catch (IOException e) { 3812 // Here we do not fail the whole method since we also need deal with other 3813 // procedures, and we can not ignore this one, so we still schedule a 3814 // AssignRegionHandler and it will report back to master if we still can not get the 3815 // TableDescriptor. 3816 LOG.warn("Failed to get TableDescriptor of {}, will try again in the handler", 3817 regionInfo.getTable(), e); 3818 } 3819 if (tableDesc != null) { 3820 tdCache.put(tableName, tableDesc); 3821 } 3822 } 3823 if (regionOpenInfo.getFavoredNodesCount() > 0) { 3824 server.updateRegionFavoredNodesMapping(regionInfo.getEncodedName(), 3825 regionOpenInfo.getFavoredNodesList()); 3826 } 3827 long procId = regionOpenInfo.getOpenProcId(); 3828 if (server.submitRegionProcedure(procId)) { 3829 server.getExecutorService().submit( 3830 AssignRegionHandler.create(server, regionInfo, procId, tableDesc, masterSystemTime)); 3831 } 3832 } 3833 } 3834 3835 private void executeCloseRegionProcedures(CloseRegionRequest request) { 3836 String encodedName; 3837 try { 3838 encodedName = ProtobufUtil.getRegionEncodedName(request.getRegion()); 3839 } catch (DoNotRetryIOException e) { 3840 throw new UncheckedIOException("Should not happen", e); 3841 } 3842 ServerName destination = request.hasDestinationServer() 3843 ? ProtobufUtil.toServerName(request.getDestinationServer()) 3844 : null; 3845 long procId = request.getCloseProcId(); 3846 if (server.submitRegionProcedure(procId)) { 3847 server.getExecutorService() 3848 .submit(UnassignRegionHandler.create(server, encodedName, procId, false, destination)); 3849 } 3850 } 3851 3852 private void executeProcedures(RemoteProcedureRequest request) { 3853 RSProcedureCallable callable; 3854 try { 3855 callable = Class.forName(request.getProcClass()).asSubclass(RSProcedureCallable.class) 3856 .getDeclaredConstructor().newInstance(); 3857 } catch (Exception e) { 3858 LOG.warn("Failed to instantiating remote procedure {}, pid={}", request.getProcClass(), 3859 request.getProcId(), e); 3860 server.remoteProcedureComplete(request.getProcId(), e); 3861 return; 3862 } 3863 callable.init(request.getProcData().toByteArray(), server); 3864 LOG.debug("Executing remote procedure {}, pid={}", callable.getClass(), request.getProcId()); 3865 server.executeProcedure(request.getProcId(), callable); 3866 } 3867 3868 @Override 3869 @QosPriority(priority = HConstants.ADMIN_QOS) 3870 public ExecuteProceduresResponse executeProcedures(RpcController controller, 3871 ExecuteProceduresRequest request) throws ServiceException { 3872 try { 3873 checkOpen(); 3874 throwOnWrongStartCode(request); 3875 server.getRegionServerCoprocessorHost().preExecuteProcedures(); 3876 if (request.getOpenRegionCount() > 0) { 3877 // Avoid reading from the TableDescritor every time(usually it will read from the file 3878 // system) 3879 Map<TableName, TableDescriptor> tdCache = new HashMap<>(); 3880 request.getOpenRegionList().forEach(req -> executeOpenRegionProcedures(req, tdCache)); 3881 } 3882 if (request.getCloseRegionCount() > 0) { 3883 request.getCloseRegionList().forEach(this::executeCloseRegionProcedures); 3884 } 3885 if (request.getProcCount() > 0) { 3886 request.getProcList().forEach(this::executeProcedures); 3887 } 3888 server.getRegionServerCoprocessorHost().postExecuteProcedures(); 3889 return ExecuteProceduresResponse.getDefaultInstance(); 3890 } catch (IOException e) { 3891 throw new ServiceException(e); 3892 } 3893 } 3894 3895 @Override 3896 public GetAllBootstrapNodesResponse getAllBootstrapNodes(RpcController controller, 3897 GetAllBootstrapNodesRequest request) throws ServiceException { 3898 GetAllBootstrapNodesResponse.Builder builder = GetAllBootstrapNodesResponse.newBuilder(); 3899 server.getBootstrapNodes() 3900 .forEachRemaining(server -> builder.addNode(ProtobufUtil.toServerName(server))); 3901 return builder.build(); 3902 } 3903}