001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019 020import com.google.errorprone.annotations.RestrictedApi; 021import java.io.FileNotFoundException; 022import java.io.IOException; 023import java.io.UncheckedIOException; 024import java.net.BindException; 025import java.net.InetAddress; 026import java.net.InetSocketAddress; 027import java.util.ArrayList; 028import java.util.Arrays; 029import java.util.Collections; 030import java.util.HashMap; 031import java.util.Iterator; 032import java.util.List; 033import java.util.Map; 034import java.util.Map.Entry; 035import java.util.NavigableMap; 036import java.util.Set; 037import java.util.TreeSet; 038import java.util.concurrent.ConcurrentHashMap; 039import java.util.concurrent.ConcurrentMap; 040import java.util.concurrent.TimeUnit; 041import java.util.concurrent.atomic.AtomicBoolean; 042import java.util.concurrent.atomic.AtomicLong; 043import java.util.concurrent.atomic.LongAdder; 044import org.apache.hadoop.conf.Configuration; 045import org.apache.hadoop.fs.FileSystem; 046import org.apache.hadoop.fs.Path; 047import org.apache.hadoop.hbase.CacheEvictionStats; 048import org.apache.hadoop.hbase.CacheEvictionStatsBuilder; 049import org.apache.hadoop.hbase.Cell; 050import org.apache.hadoop.hbase.CellScannable; 051import org.apache.hadoop.hbase.CellScanner; 052import org.apache.hadoop.hbase.CellUtil; 053import org.apache.hadoop.hbase.DoNotRetryIOException; 054import org.apache.hadoop.hbase.DroppedSnapshotException; 055import org.apache.hadoop.hbase.HBaseIOException; 056import org.apache.hadoop.hbase.HBaseRpcServicesBase; 057import org.apache.hadoop.hbase.HConstants; 058import org.apache.hadoop.hbase.MultiActionResultTooLarge; 059import org.apache.hadoop.hbase.NotServingRegionException; 060import org.apache.hadoop.hbase.PrivateCellUtil; 061import org.apache.hadoop.hbase.RegionTooBusyException; 062import org.apache.hadoop.hbase.Server; 063import org.apache.hadoop.hbase.ServerName; 064import org.apache.hadoop.hbase.TableName; 065import org.apache.hadoop.hbase.UnknownScannerException; 066import org.apache.hadoop.hbase.client.Append; 067import org.apache.hadoop.hbase.client.CheckAndMutate; 068import org.apache.hadoop.hbase.client.CheckAndMutateResult; 069import org.apache.hadoop.hbase.client.Delete; 070import org.apache.hadoop.hbase.client.Durability; 071import org.apache.hadoop.hbase.client.Get; 072import org.apache.hadoop.hbase.client.Increment; 073import org.apache.hadoop.hbase.client.Mutation; 074import org.apache.hadoop.hbase.client.OperationWithAttributes; 075import org.apache.hadoop.hbase.client.Put; 076import org.apache.hadoop.hbase.client.RegionInfo; 077import org.apache.hadoop.hbase.client.RegionReplicaUtil; 078import org.apache.hadoop.hbase.client.Result; 079import org.apache.hadoop.hbase.client.Row; 080import org.apache.hadoop.hbase.client.Scan; 081import org.apache.hadoop.hbase.client.TableDescriptor; 082import org.apache.hadoop.hbase.client.VersionInfoUtil; 083import org.apache.hadoop.hbase.exceptions.FailedSanityCheckException; 084import org.apache.hadoop.hbase.exceptions.OutOfOrderScannerNextException; 085import org.apache.hadoop.hbase.exceptions.ScannerResetException; 086import org.apache.hadoop.hbase.exceptions.TimeoutIOException; 087import org.apache.hadoop.hbase.exceptions.UnknownProtocolException; 088import org.apache.hadoop.hbase.io.ByteBuffAllocator; 089import org.apache.hadoop.hbase.io.hfile.BlockCache; 090import org.apache.hadoop.hbase.ipc.HBaseRpcController; 091import org.apache.hadoop.hbase.ipc.PriorityFunction; 092import org.apache.hadoop.hbase.ipc.QosPriority; 093import org.apache.hadoop.hbase.ipc.RpcCall; 094import org.apache.hadoop.hbase.ipc.RpcCallContext; 095import org.apache.hadoop.hbase.ipc.RpcCallback; 096import org.apache.hadoop.hbase.ipc.RpcServer; 097import org.apache.hadoop.hbase.ipc.RpcServer.BlockingServiceAndInterface; 098import org.apache.hadoop.hbase.ipc.RpcServerFactory; 099import org.apache.hadoop.hbase.ipc.RpcServerInterface; 100import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException; 101import org.apache.hadoop.hbase.ipc.ServerRpcController; 102import org.apache.hadoop.hbase.net.Address; 103import org.apache.hadoop.hbase.procedure2.RSProcedureCallable; 104import org.apache.hadoop.hbase.quotas.ActivePolicyEnforcement; 105import org.apache.hadoop.hbase.quotas.OperationQuota; 106import org.apache.hadoop.hbase.quotas.QuotaUtil; 107import org.apache.hadoop.hbase.quotas.RegionServerRpcQuotaManager; 108import org.apache.hadoop.hbase.quotas.RegionServerSpaceQuotaManager; 109import org.apache.hadoop.hbase.quotas.SpaceQuotaSnapshot; 110import org.apache.hadoop.hbase.quotas.SpaceViolationPolicyEnforcement; 111import org.apache.hadoop.hbase.regionserver.LeaseManager.Lease; 112import org.apache.hadoop.hbase.regionserver.LeaseManager.LeaseStillHeldException; 113import org.apache.hadoop.hbase.regionserver.Region.Operation; 114import org.apache.hadoop.hbase.regionserver.ScannerContext.LimitScope; 115import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker; 116import org.apache.hadoop.hbase.regionserver.handler.AssignRegionHandler; 117import org.apache.hadoop.hbase.regionserver.handler.OpenMetaHandler; 118import org.apache.hadoop.hbase.regionserver.handler.OpenPriorityRegionHandler; 119import org.apache.hadoop.hbase.regionserver.handler.OpenRegionHandler; 120import org.apache.hadoop.hbase.regionserver.handler.UnassignRegionHandler; 121import org.apache.hadoop.hbase.replication.ReplicationUtils; 122import org.apache.hadoop.hbase.replication.regionserver.RejectReplicationRequestStateChecker; 123import org.apache.hadoop.hbase.replication.regionserver.RejectRequestsFromClientStateChecker; 124import org.apache.hadoop.hbase.security.Superusers; 125import org.apache.hadoop.hbase.security.access.Permission; 126import org.apache.hadoop.hbase.util.Bytes; 127import org.apache.hadoop.hbase.util.DNS; 128import org.apache.hadoop.hbase.util.DNS.ServerType; 129import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 130import org.apache.hadoop.hbase.util.Pair; 131import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil; 132import org.apache.hadoop.hbase.wal.WAL; 133import org.apache.hadoop.hbase.wal.WALEdit; 134import org.apache.hadoop.hbase.wal.WALKey; 135import org.apache.hadoop.hbase.wal.WALSplitUtil; 136import org.apache.hadoop.hbase.wal.WALSplitUtil.MutationReplay; 137import org.apache.hadoop.hbase.zookeeper.ZKWatcher; 138import org.apache.yetus.audience.InterfaceAudience; 139import org.slf4j.Logger; 140import org.slf4j.LoggerFactory; 141 142import org.apache.hbase.thirdparty.com.google.common.cache.Cache; 143import org.apache.hbase.thirdparty.com.google.common.cache.CacheBuilder; 144import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList; 145import org.apache.hbase.thirdparty.com.google.common.collect.Lists; 146import org.apache.hbase.thirdparty.com.google.protobuf.ByteString; 147import org.apache.hbase.thirdparty.com.google.protobuf.Message; 148import org.apache.hbase.thirdparty.com.google.protobuf.RpcController; 149import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException; 150import org.apache.hbase.thirdparty.com.google.protobuf.TextFormat; 151import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations; 152import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils; 153 154import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 155import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter; 156import org.apache.hadoop.hbase.shaded.protobuf.ResponseConverter; 157import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService; 158import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearCompactionQueuesRequest; 159import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearCompactionQueuesResponse; 160import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearRegionBlockCacheRequest; 161import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearRegionBlockCacheResponse; 162import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CloseRegionRequest; 163import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CloseRegionResponse; 164import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactRegionRequest; 165import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactRegionResponse; 166import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactionSwitchRequest; 167import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactionSwitchResponse; 168import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ExecuteProceduresRequest; 169import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ExecuteProceduresResponse; 170import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionRequest; 171import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionResponse; 172import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetOnlineRegionRequest; 173import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetOnlineRegionResponse; 174import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoRequest; 175import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoResponse; 176import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionLoadRequest; 177import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionLoadResponse; 178import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetServerInfoRequest; 179import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetServerInfoResponse; 180import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetStoreFileRequest; 181import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetStoreFileResponse; 182import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionRequest; 183import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionRequest.RegionOpenInfo; 184import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionResponse; 185import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionResponse.RegionOpeningState; 186import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.RemoteProcedureRequest; 187import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ReplicateWALEntryRequest; 188import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ReplicateWALEntryResponse; 189import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.RollWALWriterRequest; 190import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.RollWALWriterResponse; 191import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.StopServerRequest; 192import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.StopServerResponse; 193import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateFavoredNodesRequest; 194import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateFavoredNodesResponse; 195import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WALEntry; 196import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WarmupRegionRequest; 197import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WarmupRegionResponse; 198import org.apache.hadoop.hbase.shaded.protobuf.generated.BootstrapNodeProtos.BootstrapNodeService; 199import org.apache.hadoop.hbase.shaded.protobuf.generated.BootstrapNodeProtos.GetAllBootstrapNodesRequest; 200import org.apache.hadoop.hbase.shaded.protobuf.generated.BootstrapNodeProtos.GetAllBootstrapNodesResponse; 201import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos; 202import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.Action; 203import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.BulkLoadHFileRequest; 204import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.BulkLoadHFileRequest.FamilyPath; 205import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.BulkLoadHFileResponse; 206import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CleanupBulkLoadRequest; 207import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CleanupBulkLoadResponse; 208import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService; 209import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.Condition; 210import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceRequest; 211import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceResponse; 212import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.GetRequest; 213import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.GetResponse; 214import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MultiRegionLoadStats; 215import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MultiRequest; 216import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MultiResponse; 217import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateRequest; 218import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateResponse; 219import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutationProto; 220import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutationProto.MutationType; 221import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.PrepareBulkLoadRequest; 222import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.PrepareBulkLoadResponse; 223import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.RegionAction; 224import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.RegionActionResult; 225import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ResultOrException; 226import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanRequest; 227import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanResponse; 228import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos; 229import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos.RegionLoad; 230import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.NameBytesPair; 231import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.NameInt64Pair; 232import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier; 233import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType; 234import org.apache.hadoop.hbase.shaded.protobuf.generated.MapReduceProtos.ScanMetrics; 235import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsRequest; 236import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsResponse; 237import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsResponse.TableQuotaSnapshot; 238import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.ClientMetaService; 239import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.BulkLoadDescriptor; 240import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.CompactionDescriptor; 241import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.FlushDescriptor; 242import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.RegionEventDescriptor; 243 244/** 245 * Implements the regionserver RPC services. 246 */ 247@InterfaceAudience.Private 248public class RSRpcServices extends HBaseRpcServicesBase<HRegionServer> 249 implements ClientService.BlockingInterface, BootstrapNodeService.BlockingInterface { 250 251 private static final Logger LOG = LoggerFactory.getLogger(RSRpcServices.class); 252 253 /** RPC scheduler to use for the region server. */ 254 public static final String REGION_SERVER_RPC_SCHEDULER_FACTORY_CLASS = 255 "hbase.region.server.rpc.scheduler.factory.class"; 256 257 /** 258 * Minimum allowable time limit delta (in milliseconds) that can be enforced during scans. This 259 * configuration exists to prevent the scenario where a time limit is specified to be so 260 * restrictive that the time limit is reached immediately (before any cells are scanned). 261 */ 262 private static final String REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA = 263 "hbase.region.server.rpc.minimum.scan.time.limit.delta"; 264 /** 265 * Default value of {@link RSRpcServices#REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA} 266 */ 267 static final long DEFAULT_REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA = 10; 268 269 /** 270 * Whether to reject rows with size > threshold defined by 271 * {@link HConstants#BATCH_ROWS_THRESHOLD_NAME} 272 */ 273 private static final String REJECT_BATCH_ROWS_OVER_THRESHOLD = 274 "hbase.rpc.rows.size.threshold.reject"; 275 276 /** 277 * Default value of config {@link RSRpcServices#REJECT_BATCH_ROWS_OVER_THRESHOLD} 278 */ 279 private static final boolean DEFAULT_REJECT_BATCH_ROWS_OVER_THRESHOLD = false; 280 281 // Request counter. (Includes requests that are not serviced by regions.) 282 // Count only once for requests with multiple actions like multi/caching-scan/replayBatch 283 final LongAdder requestCount = new LongAdder(); 284 285 // Request counter for rpc get 286 final LongAdder rpcGetRequestCount = new LongAdder(); 287 288 // Request counter for rpc scan 289 final LongAdder rpcScanRequestCount = new LongAdder(); 290 291 // Request counter for scans that might end up in full scans 292 final LongAdder rpcFullScanRequestCount = new LongAdder(); 293 294 // Request counter for rpc multi 295 final LongAdder rpcMultiRequestCount = new LongAdder(); 296 297 // Request counter for rpc mutate 298 final LongAdder rpcMutateRequestCount = new LongAdder(); 299 300 private final long maxScannerResultSize; 301 302 private ScannerIdGenerator scannerIdGenerator; 303 private final ConcurrentMap<String, RegionScannerHolder> scanners = new ConcurrentHashMap<>(); 304 // Hold the name of a closed scanner for a while. This is used to keep compatible for old clients 305 // which may send next or close request to a region scanner which has already been exhausted. The 306 // entries will be removed automatically after scannerLeaseTimeoutPeriod. 307 private final Cache<String, String> closedScanners; 308 /** 309 * The lease timeout period for client scanners (milliseconds). 310 */ 311 private final int scannerLeaseTimeoutPeriod; 312 313 /** 314 * The RPC timeout period (milliseconds) 315 */ 316 private final int rpcTimeout; 317 318 /** 319 * The minimum allowable delta to use for the scan limit 320 */ 321 private final long minimumScanTimeLimitDelta; 322 323 /** 324 * Row size threshold for multi requests above which a warning is logged 325 */ 326 private final int rowSizeWarnThreshold; 327 /* 328 * Whether we should reject requests with very high no of rows i.e. beyond threshold defined by 329 * rowSizeWarnThreshold 330 */ 331 private final boolean rejectRowsWithSizeOverThreshold; 332 333 final AtomicBoolean clearCompactionQueues = new AtomicBoolean(false); 334 335 /** 336 * Services launched in RSRpcServices. By default they are on but you can use the below booleans 337 * to selectively enable/disable either Admin or Client Service (Rare is the case where you would 338 * ever turn off one or the other). 339 */ 340 public static final String REGIONSERVER_ADMIN_SERVICE_CONFIG = 341 "hbase.regionserver.admin.executorService"; 342 public static final String REGIONSERVER_CLIENT_SERVICE_CONFIG = 343 "hbase.regionserver.client.executorService"; 344 public static final String REGIONSERVER_CLIENT_META_SERVICE_CONFIG = 345 "hbase.regionserver.client.meta.executorService"; 346 347 /** 348 * An Rpc callback for closing a RegionScanner. 349 */ 350 private static final class RegionScannerCloseCallBack implements RpcCallback { 351 352 private final RegionScanner scanner; 353 354 public RegionScannerCloseCallBack(RegionScanner scanner) { 355 this.scanner = scanner; 356 } 357 358 @Override 359 public void run() throws IOException { 360 this.scanner.close(); 361 } 362 } 363 364 /** 365 * An Rpc callback for doing shipped() call on a RegionScanner. 366 */ 367 private class RegionScannerShippedCallBack implements RpcCallback { 368 private final String scannerName; 369 private final Shipper shipper; 370 private final Lease lease; 371 372 public RegionScannerShippedCallBack(String scannerName, Shipper shipper, Lease lease) { 373 this.scannerName = scannerName; 374 this.shipper = shipper; 375 this.lease = lease; 376 } 377 378 @Override 379 public void run() throws IOException { 380 this.shipper.shipped(); 381 // We're done. On way out re-add the above removed lease. The lease was temp removed for this 382 // Rpc call and we are at end of the call now. Time to add it back. 383 if (scanners.containsKey(scannerName)) { 384 if (lease != null) { 385 server.getLeaseManager().addLease(lease); 386 } 387 } 388 } 389 } 390 391 /** 392 * An RpcCallBack that creates a list of scanners that needs to perform callBack operation on 393 * completion of multiGets. 394 */ 395 static class RegionScannersCloseCallBack implements RpcCallback { 396 private final List<RegionScanner> scanners = new ArrayList<>(); 397 398 public void addScanner(RegionScanner scanner) { 399 this.scanners.add(scanner); 400 } 401 402 @Override 403 public void run() { 404 for (RegionScanner scanner : scanners) { 405 try { 406 scanner.close(); 407 } catch (IOException e) { 408 LOG.error("Exception while closing the scanner " + scanner, e); 409 } 410 } 411 } 412 } 413 414 /** 415 * Holder class which holds the RegionScanner, nextCallSeq and RpcCallbacks together. 416 */ 417 static final class RegionScannerHolder { 418 private final AtomicLong nextCallSeq = new AtomicLong(0); 419 private final RegionScanner s; 420 private final HRegion r; 421 private final RpcCallback closeCallBack; 422 private final RpcCallback shippedCallback; 423 private byte[] rowOfLastPartialResult; 424 private boolean needCursor; 425 private boolean fullRegionScan; 426 private final String clientIPAndPort; 427 private final String userName; 428 429 RegionScannerHolder(RegionScanner s, HRegion r, RpcCallback closeCallBack, 430 RpcCallback shippedCallback, boolean needCursor, boolean fullRegionScan, 431 String clientIPAndPort, String userName) { 432 this.s = s; 433 this.r = r; 434 this.closeCallBack = closeCallBack; 435 this.shippedCallback = shippedCallback; 436 this.needCursor = needCursor; 437 this.fullRegionScan = fullRegionScan; 438 this.clientIPAndPort = clientIPAndPort; 439 this.userName = userName; 440 } 441 442 long getNextCallSeq() { 443 return nextCallSeq.get(); 444 } 445 446 boolean incNextCallSeq(long currentSeq) { 447 // Use CAS to prevent multiple scan request running on the same scanner. 448 return nextCallSeq.compareAndSet(currentSeq, currentSeq + 1); 449 } 450 451 // Should be called only when we need to print lease expired messages otherwise 452 // cache the String once made. 453 @Override 454 public String toString() { 455 return "clientIPAndPort=" + this.clientIPAndPort + ", userName=" + this.userName 456 + ", regionInfo=" + this.r.getRegionInfo().getRegionNameAsString(); 457 } 458 } 459 460 /** 461 * Instantiated as a scanner lease. If the lease times out, the scanner is closed 462 */ 463 private class ScannerListener implements LeaseListener { 464 private final String scannerName; 465 466 ScannerListener(final String n) { 467 this.scannerName = n; 468 } 469 470 @Override 471 public void leaseExpired() { 472 RegionScannerHolder rsh = scanners.remove(this.scannerName); 473 if (rsh == null) { 474 LOG.warn("Scanner lease {} expired but no outstanding scanner", this.scannerName); 475 return; 476 } 477 LOG.info("Scanner lease {} expired {}", this.scannerName, rsh); 478 server.getMetrics().incrScannerLeaseExpired(); 479 RegionScanner s = rsh.s; 480 HRegion region = null; 481 try { 482 region = server.getRegion(s.getRegionInfo().getRegionName()); 483 if (region != null && region.getCoprocessorHost() != null) { 484 region.getCoprocessorHost().preScannerClose(s); 485 } 486 } catch (IOException e) { 487 LOG.error("Closing scanner {} {}", this.scannerName, rsh, e); 488 } finally { 489 try { 490 s.close(); 491 if (region != null && region.getCoprocessorHost() != null) { 492 region.getCoprocessorHost().postScannerClose(s); 493 } 494 } catch (IOException e) { 495 LOG.error("Closing scanner {} {}", this.scannerName, rsh, e); 496 } 497 } 498 } 499 } 500 501 private static ResultOrException getResultOrException(final ClientProtos.Result r, 502 final int index) { 503 return getResultOrException(ResponseConverter.buildActionResult(r), index); 504 } 505 506 private static ResultOrException getResultOrException(final Exception e, final int index) { 507 return getResultOrException(ResponseConverter.buildActionResult(e), index); 508 } 509 510 private static ResultOrException getResultOrException(final ResultOrException.Builder builder, 511 final int index) { 512 return builder.setIndex(index).build(); 513 } 514 515 /** 516 * Checks for the following pre-checks in order: 517 * <ol> 518 * <li>RegionServer is running</li> 519 * <li>If authorization is enabled, then RPC caller has ADMIN permissions</li> 520 * </ol> 521 * @param requestName name of rpc request. Used in reporting failures to provide context. 522 * @throws ServiceException If any of the above listed pre-check fails. 523 */ 524 private void rpcPreCheck(String requestName) throws ServiceException { 525 try { 526 checkOpen(); 527 requirePermission(requestName, Permission.Action.ADMIN); 528 } catch (IOException ioe) { 529 throw new ServiceException(ioe); 530 } 531 } 532 533 private boolean isClientCellBlockSupport(RpcCallContext context) { 534 return context != null && context.isClientCellBlockSupported(); 535 } 536 537 private void addResult(final MutateResponse.Builder builder, final Result result, 538 final HBaseRpcController rpcc, boolean clientCellBlockSupported) { 539 if (result == null) return; 540 if (clientCellBlockSupported) { 541 builder.setResult(ProtobufUtil.toResultNoData(result)); 542 rpcc.setCellScanner(result.cellScanner()); 543 } else { 544 ClientProtos.Result pbr = ProtobufUtil.toResult(result); 545 builder.setResult(pbr); 546 } 547 } 548 549 private void addResults(ScanResponse.Builder builder, List<Result> results, 550 HBaseRpcController controller, boolean isDefaultRegion, boolean clientCellBlockSupported) { 551 builder.setStale(!isDefaultRegion); 552 if (results.isEmpty()) { 553 return; 554 } 555 if (clientCellBlockSupported) { 556 for (Result res : results) { 557 builder.addCellsPerResult(res.size()); 558 builder.addPartialFlagPerResult(res.mayHaveMoreCellsInRow()); 559 } 560 controller.setCellScanner(CellUtil.createCellScanner(results)); 561 } else { 562 for (Result res : results) { 563 ClientProtos.Result pbr = ProtobufUtil.toResult(res); 564 builder.addResults(pbr); 565 } 566 } 567 } 568 569 private CheckAndMutateResult checkAndMutate(HRegion region, List<ClientProtos.Action> actions, 570 CellScanner cellScanner, Condition condition, long nonceGroup, 571 ActivePolicyEnforcement spaceQuotaEnforcement) throws IOException { 572 int countOfCompleteMutation = 0; 573 try { 574 if (!region.getRegionInfo().isMetaRegion()) { 575 server.getMemStoreFlusher().reclaimMemStoreMemory(); 576 } 577 List<Mutation> mutations = new ArrayList<>(); 578 long nonce = HConstants.NO_NONCE; 579 for (ClientProtos.Action action : actions) { 580 if (action.hasGet()) { 581 throw new DoNotRetryIOException( 582 "Atomic put and/or delete only, not a Get=" + action.getGet()); 583 } 584 MutationProto mutation = action.getMutation(); 585 MutationType type = mutation.getMutateType(); 586 switch (type) { 587 case PUT: 588 Put put = ProtobufUtil.toPut(mutation, cellScanner); 589 ++countOfCompleteMutation; 590 checkCellSizeLimit(region, put); 591 spaceQuotaEnforcement.getPolicyEnforcement(region).check(put); 592 mutations.add(put); 593 break; 594 case DELETE: 595 Delete del = ProtobufUtil.toDelete(mutation, cellScanner); 596 ++countOfCompleteMutation; 597 spaceQuotaEnforcement.getPolicyEnforcement(region).check(del); 598 mutations.add(del); 599 break; 600 case INCREMENT: 601 Increment increment = ProtobufUtil.toIncrement(mutation, cellScanner); 602 nonce = mutation.hasNonce() ? mutation.getNonce() : HConstants.NO_NONCE; 603 ++countOfCompleteMutation; 604 checkCellSizeLimit(region, increment); 605 spaceQuotaEnforcement.getPolicyEnforcement(region).check(increment); 606 mutations.add(increment); 607 break; 608 case APPEND: 609 Append append = ProtobufUtil.toAppend(mutation, cellScanner); 610 nonce = mutation.hasNonce() ? mutation.getNonce() : HConstants.NO_NONCE; 611 ++countOfCompleteMutation; 612 checkCellSizeLimit(region, append); 613 spaceQuotaEnforcement.getPolicyEnforcement(region).check(append); 614 mutations.add(append); 615 break; 616 default: 617 throw new DoNotRetryIOException("invalid mutation type : " + type); 618 } 619 } 620 621 if (mutations.size() == 0) { 622 return new CheckAndMutateResult(true, null); 623 } else { 624 CheckAndMutate checkAndMutate = ProtobufUtil.toCheckAndMutate(condition, mutations); 625 CheckAndMutateResult result = null; 626 if (region.getCoprocessorHost() != null) { 627 result = region.getCoprocessorHost().preCheckAndMutate(checkAndMutate); 628 } 629 if (result == null) { 630 result = region.checkAndMutate(checkAndMutate, nonceGroup, nonce); 631 if (region.getCoprocessorHost() != null) { 632 result = region.getCoprocessorHost().postCheckAndMutate(checkAndMutate, result); 633 } 634 } 635 return result; 636 } 637 } finally { 638 // Currently, the checkAndMutate isn't supported by batch so it won't mess up the cell scanner 639 // even if the malformed cells are not skipped. 640 for (int i = countOfCompleteMutation; i < actions.size(); ++i) { 641 skipCellsForMutation(actions.get(i), cellScanner); 642 } 643 } 644 } 645 646 /** 647 * Execute an append mutation. 648 * @return result to return to client if default operation should be bypassed as indicated by 649 * RegionObserver, null otherwise 650 */ 651 private Result append(final HRegion region, final OperationQuota quota, 652 final MutationProto mutation, final CellScanner cellScanner, long nonceGroup, 653 ActivePolicyEnforcement spaceQuota, RpcCallContext context) throws IOException { 654 long before = EnvironmentEdgeManager.currentTime(); 655 Append append = ProtobufUtil.toAppend(mutation, cellScanner); 656 checkCellSizeLimit(region, append); 657 spaceQuota.getPolicyEnforcement(region).check(append); 658 quota.addMutation(append); 659 long blockBytesScannedBefore = context != null ? context.getBlockBytesScanned() : 0; 660 long nonce = mutation.hasNonce() ? mutation.getNonce() : HConstants.NO_NONCE; 661 Result r = region.append(append, nonceGroup, nonce); 662 if (server.getMetrics() != null) { 663 long blockBytesScanned = 664 context != null ? context.getBlockBytesScanned() - blockBytesScannedBefore : 0; 665 server.getMetrics().updateAppend(region, EnvironmentEdgeManager.currentTime() - before, 666 blockBytesScanned); 667 } 668 return r == null ? Result.EMPTY_RESULT : r; 669 } 670 671 /** 672 * Execute an increment mutation. 673 */ 674 private Result increment(final HRegion region, final OperationQuota quota, 675 final MutationProto mutation, final CellScanner cells, long nonceGroup, 676 ActivePolicyEnforcement spaceQuota, RpcCallContext context) throws IOException { 677 long before = EnvironmentEdgeManager.currentTime(); 678 Increment increment = ProtobufUtil.toIncrement(mutation, cells); 679 checkCellSizeLimit(region, increment); 680 spaceQuota.getPolicyEnforcement(region).check(increment); 681 quota.addMutation(increment); 682 long blockBytesScannedBefore = context != null ? context.getBlockBytesScanned() : 0; 683 long nonce = mutation.hasNonce() ? mutation.getNonce() : HConstants.NO_NONCE; 684 Result r = region.increment(increment, nonceGroup, nonce); 685 final MetricsRegionServer metricsRegionServer = server.getMetrics(); 686 if (metricsRegionServer != null) { 687 long blockBytesScanned = 688 context != null ? context.getBlockBytesScanned() - blockBytesScannedBefore : 0; 689 metricsRegionServer.updateIncrement(region, EnvironmentEdgeManager.currentTime() - before, 690 blockBytesScanned); 691 } 692 return r == null ? Result.EMPTY_RESULT : r; 693 } 694 695 /** 696 * Run through the regionMutation <code>rm</code> and per Mutation, do the work, and then when 697 * done, add an instance of a {@link ResultOrException} that corresponds to each Mutation. 698 * @param cellsToReturn Could be null. May be allocated in this method. This is what this method 699 * returns as a 'result'. 700 * @param closeCallBack the callback to be used with multigets 701 * @param context the current RpcCallContext 702 * @return Return the <code>cellScanner</code> passed 703 */ 704 private List<CellScannable> doNonAtomicRegionMutation(final HRegion region, 705 final OperationQuota quota, final RegionAction actions, final CellScanner cellScanner, 706 final RegionActionResult.Builder builder, List<CellScannable> cellsToReturn, long nonceGroup, 707 final RegionScannersCloseCallBack closeCallBack, RpcCallContext context, 708 ActivePolicyEnforcement spaceQuotaEnforcement) { 709 // Gather up CONTIGUOUS Puts and Deletes in this mutations List. Idea is that rather than do 710 // one at a time, we instead pass them in batch. Be aware that the corresponding 711 // ResultOrException instance that matches each Put or Delete is then added down in the 712 // doNonAtomicBatchOp call. We should be staying aligned though the Put and Delete are 713 // deferred/batched 714 List<ClientProtos.Action> mutations = null; 715 long maxQuotaResultSize = Math.min(maxScannerResultSize, quota.getReadAvailable()); 716 IOException sizeIOE = null; 717 ClientProtos.ResultOrException.Builder resultOrExceptionBuilder = 718 ResultOrException.newBuilder(); 719 boolean hasResultOrException = false; 720 for (ClientProtos.Action action : actions.getActionList()) { 721 hasResultOrException = false; 722 resultOrExceptionBuilder.clear(); 723 try { 724 Result r = null; 725 long blockBytesScannedBefore = context != null ? context.getBlockBytesScanned() : 0; 726 if ( 727 context != null && context.isRetryImmediatelySupported() 728 && (context.getResponseCellSize() > maxQuotaResultSize 729 || blockBytesScannedBefore + context.getResponseExceptionSize() > maxQuotaResultSize) 730 ) { 731 732 // We're storing the exception since the exception and reason string won't 733 // change after the response size limit is reached. 734 if (sizeIOE == null) { 735 // We don't need the stack un-winding do don't throw the exception. 736 // Throwing will kill the JVM's JIT. 737 // 738 // Instead just create the exception and then store it. 739 sizeIOE = new MultiActionResultTooLarge("Max size exceeded" + " CellSize: " 740 + context.getResponseCellSize() + " BlockSize: " + blockBytesScannedBefore); 741 742 // Only report the exception once since there's only one request that 743 // caused the exception. Otherwise this number will dominate the exceptions count. 744 rpcServer.getMetrics().exception(sizeIOE); 745 } 746 747 // Now that there's an exception is known to be created 748 // use it for the response. 749 // 750 // This will create a copy in the builder. 751 NameBytesPair pair = ResponseConverter.buildException(sizeIOE); 752 resultOrExceptionBuilder.setException(pair); 753 context.incrementResponseExceptionSize(pair.getSerializedSize()); 754 resultOrExceptionBuilder.setIndex(action.getIndex()); 755 builder.addResultOrException(resultOrExceptionBuilder.build()); 756 skipCellsForMutation(action, cellScanner); 757 continue; 758 } 759 if (action.hasGet()) { 760 long before = EnvironmentEdgeManager.currentTime(); 761 ClientProtos.Get pbGet = action.getGet(); 762 // An asynchbase client, https://github.com/OpenTSDB/asynchbase, starts by trying to do 763 // a get closest before. Throwing the UnknownProtocolException signals it that it needs 764 // to switch and do hbase2 protocol (HBase servers do not tell clients what versions 765 // they are; its a problem for non-native clients like asynchbase. HBASE-20225. 766 if (pbGet.hasClosestRowBefore() && pbGet.getClosestRowBefore()) { 767 throw new UnknownProtocolException("Is this a pre-hbase-1.0.0 or asynchbase client? " 768 + "Client is invoking getClosestRowBefore removed in hbase-2.0.0 replaced by " 769 + "reverse Scan."); 770 } 771 try { 772 Get get = ProtobufUtil.toGet(pbGet); 773 if (context != null) { 774 r = get(get, (region), closeCallBack, context); 775 } else { 776 r = region.get(get); 777 } 778 } finally { 779 final MetricsRegionServer metricsRegionServer = server.getMetrics(); 780 if (metricsRegionServer != null) { 781 long blockBytesScanned = 782 context != null ? context.getBlockBytesScanned() - blockBytesScannedBefore : 0; 783 metricsRegionServer.updateGet(region, EnvironmentEdgeManager.currentTime() - before, 784 blockBytesScanned); 785 } 786 } 787 } else if (action.hasServiceCall()) { 788 hasResultOrException = true; 789 Message result = execServiceOnRegion(region, action.getServiceCall()); 790 ClientProtos.CoprocessorServiceResult.Builder serviceResultBuilder = 791 ClientProtos.CoprocessorServiceResult.newBuilder(); 792 resultOrExceptionBuilder.setServiceResult(serviceResultBuilder 793 .setValue(serviceResultBuilder.getValueBuilder().setName(result.getClass().getName()) 794 // TODO: Copy!!! 795 .setValue(UnsafeByteOperations.unsafeWrap(result.toByteArray())))); 796 } else if (action.hasMutation()) { 797 MutationType type = action.getMutation().getMutateType(); 798 if ( 799 type != MutationType.PUT && type != MutationType.DELETE && mutations != null 800 && !mutations.isEmpty() 801 ) { 802 // Flush out any Puts or Deletes already collected. 803 doNonAtomicBatchOp(builder, region, quota, mutations, cellScanner, 804 spaceQuotaEnforcement); 805 mutations.clear(); 806 } 807 switch (type) { 808 case APPEND: 809 r = append(region, quota, action.getMutation(), cellScanner, nonceGroup, 810 spaceQuotaEnforcement, context); 811 break; 812 case INCREMENT: 813 r = increment(region, quota, action.getMutation(), cellScanner, nonceGroup, 814 spaceQuotaEnforcement, context); 815 break; 816 case PUT: 817 case DELETE: 818 // Collect the individual mutations and apply in a batch 819 if (mutations == null) { 820 mutations = new ArrayList<>(actions.getActionCount()); 821 } 822 mutations.add(action); 823 break; 824 default: 825 throw new DoNotRetryIOException("Unsupported mutate type: " + type.name()); 826 } 827 } else { 828 throw new HBaseIOException("Unexpected Action type"); 829 } 830 if (r != null) { 831 ClientProtos.Result pbResult = null; 832 if (isClientCellBlockSupport(context)) { 833 pbResult = ProtobufUtil.toResultNoData(r); 834 // Hard to guess the size here. Just make a rough guess. 835 if (cellsToReturn == null) { 836 cellsToReturn = new ArrayList<>(); 837 } 838 cellsToReturn.add(r); 839 } else { 840 pbResult = ProtobufUtil.toResult(r); 841 } 842 addSize(context, r); 843 hasResultOrException = true; 844 resultOrExceptionBuilder.setResult(pbResult); 845 } 846 // Could get to here and there was no result and no exception. Presumes we added 847 // a Put or Delete to the collecting Mutations List for adding later. In this 848 // case the corresponding ResultOrException instance for the Put or Delete will be added 849 // down in the doNonAtomicBatchOp method call rather than up here. 850 } catch (IOException ie) { 851 rpcServer.getMetrics().exception(ie); 852 hasResultOrException = true; 853 NameBytesPair pair = ResponseConverter.buildException(ie); 854 resultOrExceptionBuilder.setException(pair); 855 context.incrementResponseExceptionSize(pair.getSerializedSize()); 856 } 857 if (hasResultOrException) { 858 // Propagate index. 859 resultOrExceptionBuilder.setIndex(action.getIndex()); 860 builder.addResultOrException(resultOrExceptionBuilder.build()); 861 } 862 } 863 // Finish up any outstanding mutations 864 if (!CollectionUtils.isEmpty(mutations)) { 865 doNonAtomicBatchOp(builder, region, quota, mutations, cellScanner, spaceQuotaEnforcement); 866 } 867 return cellsToReturn; 868 } 869 870 private void checkCellSizeLimit(final HRegion r, final Mutation m) throws IOException { 871 if (r.maxCellSize > 0) { 872 CellScanner cells = m.cellScanner(); 873 while (cells.advance()) { 874 int size = PrivateCellUtil.estimatedSerializedSizeOf(cells.current()); 875 if (size > r.maxCellSize) { 876 String msg = "Cell[" + cells.current() + "] with size " + size + " exceeds limit of " 877 + r.maxCellSize + " bytes"; 878 LOG.debug(msg); 879 throw new DoNotRetryIOException(msg); 880 } 881 } 882 } 883 } 884 885 private void doAtomicBatchOp(final RegionActionResult.Builder builder, final HRegion region, 886 final OperationQuota quota, final List<ClientProtos.Action> mutations, final CellScanner cells, 887 long nonceGroup, ActivePolicyEnforcement spaceQuotaEnforcement) throws IOException { 888 // Just throw the exception. The exception will be caught and then added to region-level 889 // exception for RegionAction. Leaving the null to action result is ok since the null 890 // result is viewed as failure by hbase client. And the region-lever exception will be used 891 // to replaced the null result. see AsyncRequestFutureImpl#receiveMultiAction and 892 // AsyncBatchRpcRetryingCaller#onComplete for more details. 893 doBatchOp(builder, region, quota, mutations, cells, nonceGroup, spaceQuotaEnforcement, true); 894 } 895 896 private void doNonAtomicBatchOp(final RegionActionResult.Builder builder, final HRegion region, 897 final OperationQuota quota, final List<ClientProtos.Action> mutations, final CellScanner cells, 898 ActivePolicyEnforcement spaceQuotaEnforcement) { 899 try { 900 doBatchOp(builder, region, quota, mutations, cells, HConstants.NO_NONCE, 901 spaceQuotaEnforcement, false); 902 } catch (IOException e) { 903 // Set the exception for each action. The mutations in same RegionAction are group to 904 // different batch and then be processed individually. Hence, we don't set the region-level 905 // exception here for whole RegionAction. 906 for (Action mutation : mutations) { 907 builder.addResultOrException(getResultOrException(e, mutation.getIndex())); 908 } 909 } 910 } 911 912 /** 913 * Execute a list of mutations. 914 */ 915 private void doBatchOp(final RegionActionResult.Builder builder, final HRegion region, 916 final OperationQuota quota, final List<ClientProtos.Action> mutations, final CellScanner cells, 917 long nonceGroup, ActivePolicyEnforcement spaceQuotaEnforcement, boolean atomic) 918 throws IOException { 919 Mutation[] mArray = new Mutation[mutations.size()]; 920 long before = EnvironmentEdgeManager.currentTime(); 921 boolean batchContainsPuts = false, batchContainsDelete = false; 922 try { 923 /** 924 * HBASE-17924 mutationActionMap is a map to map the relation between mutations and actions 925 * since mutation array may have been reoredered.In order to return the right result or 926 * exception to the corresponding actions, We need to know which action is the mutation belong 927 * to. We can't sort ClientProtos.Action array, since they are bonded to cellscanners. 928 */ 929 Map<Mutation, ClientProtos.Action> mutationActionMap = new HashMap<>(); 930 int i = 0; 931 long nonce = HConstants.NO_NONCE; 932 for (ClientProtos.Action action : mutations) { 933 if (action.hasGet()) { 934 throw new DoNotRetryIOException( 935 "Atomic put and/or delete only, not a Get=" + action.getGet()); 936 } 937 MutationProto m = action.getMutation(); 938 Mutation mutation; 939 switch (m.getMutateType()) { 940 case PUT: 941 mutation = ProtobufUtil.toPut(m, cells); 942 batchContainsPuts = true; 943 break; 944 945 case DELETE: 946 mutation = ProtobufUtil.toDelete(m, cells); 947 batchContainsDelete = true; 948 break; 949 950 case INCREMENT: 951 mutation = ProtobufUtil.toIncrement(m, cells); 952 nonce = m.hasNonce() ? m.getNonce() : HConstants.NO_NONCE; 953 break; 954 955 case APPEND: 956 mutation = ProtobufUtil.toAppend(m, cells); 957 nonce = m.hasNonce() ? m.getNonce() : HConstants.NO_NONCE; 958 break; 959 960 default: 961 throw new DoNotRetryIOException("Invalid mutation type : " + m.getMutateType()); 962 } 963 mutationActionMap.put(mutation, action); 964 mArray[i++] = mutation; 965 checkCellSizeLimit(region, mutation); 966 // Check if a space quota disallows this mutation 967 spaceQuotaEnforcement.getPolicyEnforcement(region).check(mutation); 968 quota.addMutation(mutation); 969 } 970 971 if (!region.getRegionInfo().isMetaRegion()) { 972 server.getMemStoreFlusher().reclaimMemStoreMemory(); 973 } 974 975 // HBASE-17924 976 // Sort to improve lock efficiency for non-atomic batch of operations. If atomic 977 // order is preserved as its expected from the client 978 if (!atomic) { 979 Arrays.sort(mArray, (v1, v2) -> Row.COMPARATOR.compare(v1, v2)); 980 } 981 982 OperationStatus[] codes = region.batchMutate(mArray, atomic, nonceGroup, nonce); 983 984 // When atomic is true, it indicates that the mutateRow API or the batch API with 985 // RowMutations is called. In this case, we need to merge the results of the 986 // Increment/Append operations if the mutations include those operations, and set the merged 987 // result to the first element of the ResultOrException list 988 if (atomic) { 989 List<ResultOrException> resultOrExceptions = new ArrayList<>(); 990 List<Result> results = new ArrayList<>(); 991 for (i = 0; i < codes.length; i++) { 992 if (codes[i].getResult() != null) { 993 results.add(codes[i].getResult()); 994 } 995 if (i != 0) { 996 resultOrExceptions 997 .add(getResultOrException(ClientProtos.Result.getDefaultInstance(), i)); 998 } 999 } 1000 1001 if (results.isEmpty()) { 1002 builder.addResultOrException( 1003 getResultOrException(ClientProtos.Result.getDefaultInstance(), 0)); 1004 } else { 1005 // Merge the results of the Increment/Append operations 1006 List<Cell> cellList = new ArrayList<>(); 1007 for (Result result : results) { 1008 if (result.rawCells() != null) { 1009 cellList.addAll(Arrays.asList(result.rawCells())); 1010 } 1011 } 1012 Result result = Result.create(cellList); 1013 1014 // Set the merged result of the Increment/Append operations to the first element of the 1015 // ResultOrException list 1016 builder.addResultOrException(getResultOrException(ProtobufUtil.toResult(result), 0)); 1017 } 1018 1019 builder.addAllResultOrException(resultOrExceptions); 1020 return; 1021 } 1022 1023 for (i = 0; i < codes.length; i++) { 1024 Mutation currentMutation = mArray[i]; 1025 ClientProtos.Action currentAction = mutationActionMap.get(currentMutation); 1026 int index = currentAction.hasIndex() ? currentAction.getIndex() : i; 1027 Exception e; 1028 switch (codes[i].getOperationStatusCode()) { 1029 case BAD_FAMILY: 1030 e = new NoSuchColumnFamilyException(codes[i].getExceptionMsg()); 1031 builder.addResultOrException(getResultOrException(e, index)); 1032 break; 1033 1034 case SANITY_CHECK_FAILURE: 1035 e = new FailedSanityCheckException(codes[i].getExceptionMsg()); 1036 builder.addResultOrException(getResultOrException(e, index)); 1037 break; 1038 1039 default: 1040 e = new DoNotRetryIOException(codes[i].getExceptionMsg()); 1041 builder.addResultOrException(getResultOrException(e, index)); 1042 break; 1043 1044 case SUCCESS: 1045 builder.addResultOrException( 1046 getResultOrException(ClientProtos.Result.getDefaultInstance(), index)); 1047 break; 1048 1049 case STORE_TOO_BUSY: 1050 e = new RegionTooBusyException(codes[i].getExceptionMsg()); 1051 builder.addResultOrException(getResultOrException(e, index)); 1052 break; 1053 } 1054 } 1055 } finally { 1056 int processedMutationIndex = 0; 1057 for (Action mutation : mutations) { 1058 // The non-null mArray[i] means the cell scanner has been read. 1059 if (mArray[processedMutationIndex++] == null) { 1060 skipCellsForMutation(mutation, cells); 1061 } 1062 } 1063 updateMutationMetrics(region, before, batchContainsPuts, batchContainsDelete); 1064 } 1065 } 1066 1067 private void updateMutationMetrics(HRegion region, long starttime, boolean batchContainsPuts, 1068 boolean batchContainsDelete) { 1069 final MetricsRegionServer metricsRegionServer = server.getMetrics(); 1070 if (metricsRegionServer != null) { 1071 long after = EnvironmentEdgeManager.currentTime(); 1072 if (batchContainsPuts) { 1073 metricsRegionServer.updatePutBatch(region, after - starttime); 1074 } 1075 if (batchContainsDelete) { 1076 metricsRegionServer.updateDeleteBatch(region, after - starttime); 1077 } 1078 } 1079 } 1080 1081 /** 1082 * Execute a list of Put/Delete mutations. The function returns OperationStatus instead of 1083 * constructing MultiResponse to save a possible loop if caller doesn't need MultiResponse. 1084 * @return an array of OperationStatus which internally contains the OperationStatusCode and the 1085 * exceptionMessage if any 1086 * @deprecated Since 3.0.0, will be removed in 4.0.0. We do not use this method for replaying 1087 * edits for secondary replicas any more, see 1088 * {@link #replicateToReplica(RpcController, ReplicateWALEntryRequest)}. 1089 */ 1090 @Deprecated 1091 private OperationStatus[] doReplayBatchOp(final HRegion region, 1092 final List<MutationReplay> mutations, long replaySeqId) throws IOException { 1093 long before = EnvironmentEdgeManager.currentTime(); 1094 boolean batchContainsPuts = false, batchContainsDelete = false; 1095 try { 1096 for (Iterator<MutationReplay> it = mutations.iterator(); it.hasNext();) { 1097 MutationReplay m = it.next(); 1098 1099 if (m.getType() == MutationType.PUT) { 1100 batchContainsPuts = true; 1101 } else { 1102 batchContainsDelete = true; 1103 } 1104 1105 NavigableMap<byte[], List<Cell>> map = m.mutation.getFamilyCellMap(); 1106 List<Cell> metaCells = map.get(WALEdit.METAFAMILY); 1107 if (metaCells != null && !metaCells.isEmpty()) { 1108 for (Cell metaCell : metaCells) { 1109 CompactionDescriptor compactionDesc = WALEdit.getCompaction(metaCell); 1110 boolean isDefaultReplica = RegionReplicaUtil.isDefaultReplica(region.getRegionInfo()); 1111 HRegion hRegion = region; 1112 if (compactionDesc != null) { 1113 // replay the compaction. Remove the files from stores only if we are the primary 1114 // region replica (thus own the files) 1115 hRegion.replayWALCompactionMarker(compactionDesc, !isDefaultReplica, isDefaultReplica, 1116 replaySeqId); 1117 continue; 1118 } 1119 FlushDescriptor flushDesc = WALEdit.getFlushDescriptor(metaCell); 1120 if (flushDesc != null && !isDefaultReplica) { 1121 hRegion.replayWALFlushMarker(flushDesc, replaySeqId); 1122 continue; 1123 } 1124 RegionEventDescriptor regionEvent = WALEdit.getRegionEventDescriptor(metaCell); 1125 if (regionEvent != null && !isDefaultReplica) { 1126 hRegion.replayWALRegionEventMarker(regionEvent); 1127 continue; 1128 } 1129 BulkLoadDescriptor bulkLoadEvent = WALEdit.getBulkLoadDescriptor(metaCell); 1130 if (bulkLoadEvent != null) { 1131 hRegion.replayWALBulkLoadEventMarker(bulkLoadEvent); 1132 continue; 1133 } 1134 } 1135 it.remove(); 1136 } 1137 } 1138 requestCount.increment(); 1139 if (!region.getRegionInfo().isMetaRegion()) { 1140 server.getMemStoreFlusher().reclaimMemStoreMemory(); 1141 } 1142 return region.batchReplay(mutations.toArray(new MutationReplay[mutations.size()]), 1143 replaySeqId); 1144 } finally { 1145 updateMutationMetrics(region, before, batchContainsPuts, batchContainsDelete); 1146 } 1147 } 1148 1149 private void closeAllScanners() { 1150 // Close any outstanding scanners. Means they'll get an UnknownScanner 1151 // exception next time they come in. 1152 for (Map.Entry<String, RegionScannerHolder> e : scanners.entrySet()) { 1153 try { 1154 e.getValue().s.close(); 1155 } catch (IOException ioe) { 1156 LOG.warn("Closing scanner " + e.getKey(), ioe); 1157 } 1158 } 1159 } 1160 1161 // Directly invoked only for testing 1162 public RSRpcServices(final HRegionServer rs) throws IOException { 1163 super(rs, rs.getProcessName()); 1164 final Configuration conf = rs.getConfiguration(); 1165 rowSizeWarnThreshold = 1166 conf.getInt(HConstants.BATCH_ROWS_THRESHOLD_NAME, HConstants.BATCH_ROWS_THRESHOLD_DEFAULT); 1167 rejectRowsWithSizeOverThreshold = 1168 conf.getBoolean(REJECT_BATCH_ROWS_OVER_THRESHOLD, DEFAULT_REJECT_BATCH_ROWS_OVER_THRESHOLD); 1169 scannerLeaseTimeoutPeriod = conf.getInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, 1170 HConstants.DEFAULT_HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD); 1171 maxScannerResultSize = conf.getLong(HConstants.HBASE_SERVER_SCANNER_MAX_RESULT_SIZE_KEY, 1172 HConstants.DEFAULT_HBASE_SERVER_SCANNER_MAX_RESULT_SIZE); 1173 rpcTimeout = 1174 conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY, HConstants.DEFAULT_HBASE_RPC_TIMEOUT); 1175 minimumScanTimeLimitDelta = conf.getLong(REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA, 1176 DEFAULT_REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA); 1177 rpcServer.setNamedQueueRecorder(rs.getNamedQueueRecorder()); 1178 closedScanners = CacheBuilder.newBuilder() 1179 .expireAfterAccess(scannerLeaseTimeoutPeriod, TimeUnit.MILLISECONDS).build(); 1180 } 1181 1182 @Override 1183 protected boolean defaultReservoirEnabled() { 1184 return true; 1185 } 1186 1187 @Override 1188 protected ServerType getDNSServerType() { 1189 return DNS.ServerType.REGIONSERVER; 1190 } 1191 1192 @Override 1193 protected String getHostname(Configuration conf, String defaultHostname) { 1194 return conf.get("hbase.regionserver.ipc.address", defaultHostname); 1195 } 1196 1197 @Override 1198 protected String getPortConfigName() { 1199 return HConstants.REGIONSERVER_PORT; 1200 } 1201 1202 @Override 1203 protected int getDefaultPort() { 1204 return HConstants.DEFAULT_REGIONSERVER_PORT; 1205 } 1206 1207 @Override 1208 protected Class<?> getRpcSchedulerFactoryClass(Configuration conf) { 1209 return conf.getClass(REGION_SERVER_RPC_SCHEDULER_FACTORY_CLASS, 1210 SimpleRpcSchedulerFactory.class); 1211 } 1212 1213 protected RpcServerInterface createRpcServer(final Server server, 1214 final RpcSchedulerFactory rpcSchedulerFactory, final InetSocketAddress bindAddress, 1215 final String name) throws IOException { 1216 final Configuration conf = server.getConfiguration(); 1217 boolean reservoirEnabled = conf.getBoolean(ByteBuffAllocator.ALLOCATOR_POOL_ENABLED_KEY, true); 1218 try { 1219 return RpcServerFactory.createRpcServer(server, name, getServices(), bindAddress, // use final 1220 // bindAddress 1221 // for this 1222 // server. 1223 conf, rpcSchedulerFactory.create(conf, this, server), reservoirEnabled); 1224 } catch (BindException be) { 1225 throw new IOException(be.getMessage() + ". To switch ports use the '" 1226 + HConstants.REGIONSERVER_PORT + "' configuration property.", 1227 be.getCause() != null ? be.getCause() : be); 1228 } 1229 } 1230 1231 protected Class<?> getRpcSchedulerFactoryClass() { 1232 final Configuration conf = server.getConfiguration(); 1233 return conf.getClass(REGION_SERVER_RPC_SCHEDULER_FACTORY_CLASS, 1234 SimpleRpcSchedulerFactory.class); 1235 } 1236 1237 protected PriorityFunction createPriority() { 1238 return new RSAnnotationReadingPriorityFunction(this); 1239 } 1240 1241 public int getScannersCount() { 1242 return scanners.size(); 1243 } 1244 1245 /** Returns The outstanding RegionScanner for <code>scannerId</code> or null if none found. */ 1246 RegionScanner getScanner(long scannerId) { 1247 RegionScannerHolder rsh = getRegionScannerHolder(scannerId); 1248 return rsh == null ? null : rsh.s; 1249 } 1250 1251 /** Returns The associated RegionScannerHolder for <code>scannerId</code> or null. */ 1252 private RegionScannerHolder getRegionScannerHolder(long scannerId) { 1253 return scanners.get(toScannerName(scannerId)); 1254 } 1255 1256 public String getScanDetailsWithId(long scannerId) { 1257 RegionScanner scanner = getScanner(scannerId); 1258 if (scanner == null) { 1259 return null; 1260 } 1261 StringBuilder builder = new StringBuilder(); 1262 builder.append("table: ").append(scanner.getRegionInfo().getTable().getNameAsString()); 1263 builder.append(" region: ").append(scanner.getRegionInfo().getRegionNameAsString()); 1264 builder.append(" operation_id: ").append(scanner.getOperationId()); 1265 return builder.toString(); 1266 } 1267 1268 public String getScanDetailsWithRequest(ScanRequest request) { 1269 try { 1270 if (!request.hasRegion()) { 1271 return null; 1272 } 1273 Region region = getRegion(request.getRegion()); 1274 StringBuilder builder = new StringBuilder(); 1275 builder.append("table: ").append(region.getRegionInfo().getTable().getNameAsString()); 1276 builder.append(" region: ").append(region.getRegionInfo().getRegionNameAsString()); 1277 for (NameBytesPair pair : request.getScan().getAttributeList()) { 1278 if (OperationWithAttributes.ID_ATRIBUTE.equals(pair.getName())) { 1279 builder.append(" operation_id: ").append(Bytes.toString(pair.getValue().toByteArray())); 1280 break; 1281 } 1282 } 1283 return builder.toString(); 1284 } catch (IOException ignored) { 1285 return null; 1286 } 1287 } 1288 1289 /** 1290 * Get the vtime associated with the scanner. Currently the vtime is the number of "next" calls. 1291 */ 1292 long getScannerVirtualTime(long scannerId) { 1293 RegionScannerHolder rsh = getRegionScannerHolder(scannerId); 1294 return rsh == null ? 0L : rsh.getNextCallSeq(); 1295 } 1296 1297 /** 1298 * Method to account for the size of retained cells. 1299 * @param context rpc call context 1300 * @param r result to add size. 1301 * @return an object that represents the last referenced block from this response. 1302 */ 1303 void addSize(RpcCallContext context, Result r) { 1304 if (context != null && r != null && !r.isEmpty()) { 1305 for (Cell c : r.rawCells()) { 1306 context.incrementResponseCellSize(PrivateCellUtil.estimatedSerializedSizeOf(c)); 1307 } 1308 } 1309 } 1310 1311 /** Returns Remote client's ip and port else null if can't be determined. */ 1312 @RestrictedApi(explanation = "Should only be called in TestRSRpcServices and RSRpcServices", 1313 link = "", allowedOnPath = ".*(TestRSRpcServices|RSRpcServices).java") 1314 static String getRemoteClientIpAndPort() { 1315 RpcCall rpcCall = RpcServer.getCurrentCall().orElse(null); 1316 if (rpcCall == null) { 1317 return HConstants.EMPTY_STRING; 1318 } 1319 InetAddress address = rpcCall.getRemoteAddress(); 1320 if (address == null) { 1321 return HConstants.EMPTY_STRING; 1322 } 1323 // Be careful here with InetAddress. Do InetAddress#getHostAddress. It will not do a name 1324 // resolution. Just use the IP. It is generally a smaller amount of info to keep around while 1325 // scanning than a hostname anyways. 1326 return Address.fromParts(address.getHostAddress(), rpcCall.getRemotePort()).toString(); 1327 } 1328 1329 /** Returns Remote client's username. */ 1330 @RestrictedApi(explanation = "Should only be called in TestRSRpcServices and RSRpcServices", 1331 link = "", allowedOnPath = ".*(TestRSRpcServices|RSRpcServices).java") 1332 static String getUserName() { 1333 RpcCall rpcCall = RpcServer.getCurrentCall().orElse(null); 1334 if (rpcCall == null) { 1335 return HConstants.EMPTY_STRING; 1336 } 1337 return rpcCall.getRequestUserName().orElse(HConstants.EMPTY_STRING); 1338 } 1339 1340 private RegionScannerHolder addScanner(String scannerName, RegionScanner s, Shipper shipper, 1341 HRegion r, boolean needCursor, boolean fullRegionScan) throws LeaseStillHeldException { 1342 Lease lease = server.getLeaseManager().createLease(scannerName, this.scannerLeaseTimeoutPeriod, 1343 new ScannerListener(scannerName)); 1344 RpcCallback shippedCallback = new RegionScannerShippedCallBack(scannerName, shipper, lease); 1345 RpcCallback closeCallback = 1346 s instanceof RpcCallback ? (RpcCallback) s : new RegionScannerCloseCallBack(s); 1347 RegionScannerHolder rsh = new RegionScannerHolder(s, r, closeCallback, shippedCallback, 1348 needCursor, fullRegionScan, getRemoteClientIpAndPort(), getUserName()); 1349 RegionScannerHolder existing = scanners.putIfAbsent(scannerName, rsh); 1350 assert existing == null : "scannerId must be unique within regionserver's whole lifecycle! " 1351 + scannerName + ", " + existing; 1352 return rsh; 1353 } 1354 1355 private boolean isFullRegionScan(Scan scan, HRegion region) { 1356 // If the scan start row equals or less than the start key of the region 1357 // and stop row greater than equals end key (if stop row present) 1358 // or if the stop row is empty 1359 // account this as a full region scan 1360 if ( 1361 Bytes.compareTo(scan.getStartRow(), region.getRegionInfo().getStartKey()) <= 0 1362 && (Bytes.compareTo(scan.getStopRow(), region.getRegionInfo().getEndKey()) >= 0 1363 && !Bytes.equals(region.getRegionInfo().getEndKey(), HConstants.EMPTY_END_ROW) 1364 || Bytes.equals(scan.getStopRow(), HConstants.EMPTY_END_ROW)) 1365 ) { 1366 return true; 1367 } 1368 return false; 1369 } 1370 1371 /** 1372 * Find the HRegion based on a region specifier 1373 * @param regionSpecifier the region specifier 1374 * @return the corresponding region 1375 * @throws IOException if the specifier is not null, but failed to find the region 1376 */ 1377 public HRegion getRegion(final RegionSpecifier regionSpecifier) throws IOException { 1378 return server.getRegion(regionSpecifier.getValue().toByteArray()); 1379 } 1380 1381 /** 1382 * Find the List of HRegions based on a list of region specifiers 1383 * @param regionSpecifiers the list of region specifiers 1384 * @return the corresponding list of regions 1385 * @throws IOException if any of the specifiers is not null, but failed to find the region 1386 */ 1387 private List<HRegion> getRegions(final List<RegionSpecifier> regionSpecifiers, 1388 final CacheEvictionStatsBuilder stats) { 1389 List<HRegion> regions = Lists.newArrayListWithCapacity(regionSpecifiers.size()); 1390 for (RegionSpecifier regionSpecifier : regionSpecifiers) { 1391 try { 1392 regions.add(server.getRegion(regionSpecifier.getValue().toByteArray())); 1393 } catch (NotServingRegionException e) { 1394 stats.addException(regionSpecifier.getValue().toByteArray(), e); 1395 } 1396 } 1397 return regions; 1398 } 1399 1400 PriorityFunction getPriority() { 1401 return priority; 1402 } 1403 1404 private RegionServerRpcQuotaManager getRpcQuotaManager() { 1405 return server.getRegionServerRpcQuotaManager(); 1406 } 1407 1408 private RegionServerSpaceQuotaManager getSpaceQuotaManager() { 1409 return server.getRegionServerSpaceQuotaManager(); 1410 } 1411 1412 void start(ZKWatcher zkWatcher) { 1413 this.scannerIdGenerator = new ScannerIdGenerator(this.server.getServerName()); 1414 internalStart(zkWatcher); 1415 } 1416 1417 void stop() { 1418 closeAllScanners(); 1419 internalStop(); 1420 } 1421 1422 /** 1423 * Called to verify that this server is up and running. 1424 */ 1425 // TODO : Rename this and HMaster#checkInitialized to isRunning() (or a better name). 1426 protected void checkOpen() throws IOException { 1427 if (server.isAborted()) { 1428 throw new RegionServerAbortedException("Server " + server.getServerName() + " aborting"); 1429 } 1430 if (server.isStopped()) { 1431 throw new RegionServerStoppedException("Server " + server.getServerName() + " stopping"); 1432 } 1433 if (!server.isDataFileSystemOk()) { 1434 throw new RegionServerStoppedException("File system not available"); 1435 } 1436 if (!server.isOnline()) { 1437 throw new ServerNotRunningYetException( 1438 "Server " + server.getServerName() + " is not running yet"); 1439 } 1440 } 1441 1442 /** 1443 * By default, put up an Admin and a Client Service. Set booleans 1444 * <code>hbase.regionserver.admin.executorService</code> and 1445 * <code>hbase.regionserver.client.executorService</code> if you want to enable/disable services. 1446 * Default is that both are enabled. 1447 * @return immutable list of blocking services and the security info classes that this server 1448 * supports 1449 */ 1450 protected List<BlockingServiceAndInterface> getServices() { 1451 boolean admin = getConfiguration().getBoolean(REGIONSERVER_ADMIN_SERVICE_CONFIG, true); 1452 boolean client = getConfiguration().getBoolean(REGIONSERVER_CLIENT_SERVICE_CONFIG, true); 1453 boolean clientMeta = 1454 getConfiguration().getBoolean(REGIONSERVER_CLIENT_META_SERVICE_CONFIG, true); 1455 List<BlockingServiceAndInterface> bssi = new ArrayList<>(); 1456 if (client) { 1457 bssi.add(new BlockingServiceAndInterface(ClientService.newReflectiveBlockingService(this), 1458 ClientService.BlockingInterface.class)); 1459 } 1460 if (admin) { 1461 bssi.add(new BlockingServiceAndInterface(AdminService.newReflectiveBlockingService(this), 1462 AdminService.BlockingInterface.class)); 1463 } 1464 if (clientMeta) { 1465 bssi.add(new BlockingServiceAndInterface(ClientMetaService.newReflectiveBlockingService(this), 1466 ClientMetaService.BlockingInterface.class)); 1467 } 1468 return new ImmutableList.Builder<BlockingServiceAndInterface>().addAll(bssi).build(); 1469 } 1470 1471 /** 1472 * Close a region on the region server. 1473 * @param controller the RPC controller 1474 * @param request the request 1475 */ 1476 @Override 1477 @QosPriority(priority = HConstants.ADMIN_QOS) 1478 public CloseRegionResponse closeRegion(final RpcController controller, 1479 final CloseRegionRequest request) throws ServiceException { 1480 final ServerName sn = (request.hasDestinationServer() 1481 ? ProtobufUtil.toServerName(request.getDestinationServer()) 1482 : null); 1483 1484 try { 1485 checkOpen(); 1486 throwOnWrongStartCode(request); 1487 final String encodedRegionName = ProtobufUtil.getRegionEncodedName(request.getRegion()); 1488 1489 requestCount.increment(); 1490 if (sn == null) { 1491 LOG.info("Close " + encodedRegionName + " without moving"); 1492 } else { 1493 LOG.info("Close " + encodedRegionName + ", moving to " + sn); 1494 } 1495 boolean closed = server.closeRegion(encodedRegionName, false, sn); 1496 CloseRegionResponse.Builder builder = CloseRegionResponse.newBuilder().setClosed(closed); 1497 return builder.build(); 1498 } catch (IOException ie) { 1499 throw new ServiceException(ie); 1500 } 1501 } 1502 1503 /** 1504 * Compact a region on the region server. 1505 * @param controller the RPC controller 1506 * @param request the request 1507 */ 1508 @Override 1509 @QosPriority(priority = HConstants.ADMIN_QOS) 1510 public CompactRegionResponse compactRegion(final RpcController controller, 1511 final CompactRegionRequest request) throws ServiceException { 1512 try { 1513 checkOpen(); 1514 requestCount.increment(); 1515 HRegion region = getRegion(request.getRegion()); 1516 // Quota support is enabled, the requesting user is not system/super user 1517 // and a quota policy is enforced that disables compactions. 1518 if ( 1519 QuotaUtil.isQuotaEnabled(getConfiguration()) 1520 && !Superusers.isSuperUser(RpcServer.getRequestUser().orElse(null)) 1521 && this.server.getRegionServerSpaceQuotaManager() 1522 .areCompactionsDisabled(region.getTableDescriptor().getTableName()) 1523 ) { 1524 throw new DoNotRetryIOException( 1525 "Compactions on this region are " + "disabled due to a space quota violation."); 1526 } 1527 region.startRegionOperation(Operation.COMPACT_REGION); 1528 LOG.info("Compacting " + region.getRegionInfo().getRegionNameAsString()); 1529 boolean major = request.hasMajor() && request.getMajor(); 1530 if (request.hasFamily()) { 1531 byte[] family = request.getFamily().toByteArray(); 1532 String log = "User-triggered " + (major ? "major " : "") + "compaction for region " 1533 + region.getRegionInfo().getRegionNameAsString() + " and family " 1534 + Bytes.toString(family); 1535 LOG.trace(log); 1536 region.requestCompaction(family, log, Store.PRIORITY_USER, major, 1537 CompactionLifeCycleTracker.DUMMY); 1538 } else { 1539 String log = "User-triggered " + (major ? "major " : "") + "compaction for region " 1540 + region.getRegionInfo().getRegionNameAsString(); 1541 LOG.trace(log); 1542 region.requestCompaction(log, Store.PRIORITY_USER, major, CompactionLifeCycleTracker.DUMMY); 1543 } 1544 return CompactRegionResponse.newBuilder().build(); 1545 } catch (IOException ie) { 1546 throw new ServiceException(ie); 1547 } 1548 } 1549 1550 @Override 1551 public CompactionSwitchResponse compactionSwitch(RpcController controller, 1552 CompactionSwitchRequest request) throws ServiceException { 1553 rpcPreCheck("compactionSwitch"); 1554 final CompactSplit compactSplitThread = server.getCompactSplitThread(); 1555 requestCount.increment(); 1556 boolean prevState = compactSplitThread.isCompactionsEnabled(); 1557 CompactionSwitchResponse response = 1558 CompactionSwitchResponse.newBuilder().setPrevState(prevState).build(); 1559 if (prevState == request.getEnabled()) { 1560 // passed in requested state is same as current state. No action required 1561 return response; 1562 } 1563 compactSplitThread.switchCompaction(request.getEnabled()); 1564 return response; 1565 } 1566 1567 /** 1568 * Flush a region on the region server. 1569 * @param controller the RPC controller 1570 * @param request the request 1571 */ 1572 @Override 1573 @QosPriority(priority = HConstants.ADMIN_QOS) 1574 public FlushRegionResponse flushRegion(final RpcController controller, 1575 final FlushRegionRequest request) throws ServiceException { 1576 try { 1577 checkOpen(); 1578 requestCount.increment(); 1579 HRegion region = getRegion(request.getRegion()); 1580 LOG.info("Flushing " + region.getRegionInfo().getRegionNameAsString()); 1581 boolean shouldFlush = true; 1582 if (request.hasIfOlderThanTs()) { 1583 shouldFlush = region.getEarliestFlushTimeForAllStores() < request.getIfOlderThanTs(); 1584 } 1585 FlushRegionResponse.Builder builder = FlushRegionResponse.newBuilder(); 1586 if (shouldFlush) { 1587 boolean writeFlushWalMarker = 1588 request.hasWriteFlushWalMarker() ? request.getWriteFlushWalMarker() : false; 1589 // Go behind the curtain so we can manage writing of the flush WAL marker 1590 HRegion.FlushResultImpl flushResult = null; 1591 if (request.hasFamily()) { 1592 List families = new ArrayList(); 1593 families.add(request.getFamily().toByteArray()); 1594 flushResult = 1595 region.flushcache(families, writeFlushWalMarker, FlushLifeCycleTracker.DUMMY); 1596 } else { 1597 flushResult = region.flushcache(true, writeFlushWalMarker, FlushLifeCycleTracker.DUMMY); 1598 } 1599 boolean compactionNeeded = flushResult.isCompactionNeeded(); 1600 if (compactionNeeded) { 1601 server.getCompactSplitThread().requestSystemCompaction(region, 1602 "Compaction through user triggered flush"); 1603 } 1604 builder.setFlushed(flushResult.isFlushSucceeded()); 1605 builder.setWroteFlushWalMarker(flushResult.wroteFlushWalMarker); 1606 } 1607 builder.setLastFlushTime(region.getEarliestFlushTimeForAllStores()); 1608 return builder.build(); 1609 } catch (DroppedSnapshotException ex) { 1610 // Cache flush can fail in a few places. If it fails in a critical 1611 // section, we get a DroppedSnapshotException and a replay of wal 1612 // is required. Currently the only way to do this is a restart of 1613 // the server. 1614 server.abort("Replay of WAL required. Forcing server shutdown", ex); 1615 throw new ServiceException(ex); 1616 } catch (IOException ie) { 1617 throw new ServiceException(ie); 1618 } 1619 } 1620 1621 @Override 1622 @QosPriority(priority = HConstants.ADMIN_QOS) 1623 public GetOnlineRegionResponse getOnlineRegion(final RpcController controller, 1624 final GetOnlineRegionRequest request) throws ServiceException { 1625 try { 1626 checkOpen(); 1627 requestCount.increment(); 1628 Map<String, HRegion> onlineRegions = server.getOnlineRegions(); 1629 List<RegionInfo> list = new ArrayList<>(onlineRegions.size()); 1630 for (HRegion region : onlineRegions.values()) { 1631 list.add(region.getRegionInfo()); 1632 } 1633 list.sort(RegionInfo.COMPARATOR); 1634 return ResponseConverter.buildGetOnlineRegionResponse(list); 1635 } catch (IOException ie) { 1636 throw new ServiceException(ie); 1637 } 1638 } 1639 1640 // Master implementation of this Admin Service differs given it is not 1641 // able to supply detail only known to RegionServer. See note on 1642 // MasterRpcServers#getRegionInfo. 1643 @Override 1644 @QosPriority(priority = HConstants.ADMIN_QOS) 1645 public GetRegionInfoResponse getRegionInfo(final RpcController controller, 1646 final GetRegionInfoRequest request) throws ServiceException { 1647 try { 1648 checkOpen(); 1649 requestCount.increment(); 1650 HRegion region = getRegion(request.getRegion()); 1651 RegionInfo info = region.getRegionInfo(); 1652 byte[] bestSplitRow; 1653 if (request.hasBestSplitRow() && request.getBestSplitRow()) { 1654 bestSplitRow = region.checkSplit(true).orElse(null); 1655 // when all table data are in memstore, bestSplitRow = null 1656 // try to flush region first 1657 if (bestSplitRow == null) { 1658 region.flush(true); 1659 bestSplitRow = region.checkSplit(true).orElse(null); 1660 } 1661 } else { 1662 bestSplitRow = null; 1663 } 1664 GetRegionInfoResponse.Builder builder = GetRegionInfoResponse.newBuilder(); 1665 builder.setRegionInfo(ProtobufUtil.toRegionInfo(info)); 1666 if (request.hasCompactionState() && request.getCompactionState()) { 1667 builder.setCompactionState(ProtobufUtil.createCompactionState(region.getCompactionState())); 1668 } 1669 builder.setSplittable(region.isSplittable()); 1670 builder.setMergeable(region.isMergeable()); 1671 if (request.hasBestSplitRow() && request.getBestSplitRow() && bestSplitRow != null) { 1672 builder.setBestSplitRow(UnsafeByteOperations.unsafeWrap(bestSplitRow)); 1673 } 1674 return builder.build(); 1675 } catch (IOException ie) { 1676 throw new ServiceException(ie); 1677 } 1678 } 1679 1680 @Override 1681 @QosPriority(priority = HConstants.ADMIN_QOS) 1682 public GetRegionLoadResponse getRegionLoad(RpcController controller, GetRegionLoadRequest request) 1683 throws ServiceException { 1684 1685 List<HRegion> regions; 1686 if (request.hasTableName()) { 1687 TableName tableName = ProtobufUtil.toTableName(request.getTableName()); 1688 regions = server.getRegions(tableName); 1689 } else { 1690 regions = server.getRegions(); 1691 } 1692 List<RegionLoad> rLoads = new ArrayList<>(regions.size()); 1693 RegionLoad.Builder regionLoadBuilder = ClusterStatusProtos.RegionLoad.newBuilder(); 1694 RegionSpecifier.Builder regionSpecifier = RegionSpecifier.newBuilder(); 1695 1696 try { 1697 for (HRegion region : regions) { 1698 rLoads.add(server.createRegionLoad(region, regionLoadBuilder, regionSpecifier)); 1699 } 1700 } catch (IOException e) { 1701 throw new ServiceException(e); 1702 } 1703 GetRegionLoadResponse.Builder builder = GetRegionLoadResponse.newBuilder(); 1704 builder.addAllRegionLoads(rLoads); 1705 return builder.build(); 1706 } 1707 1708 @Override 1709 @QosPriority(priority = HConstants.ADMIN_QOS) 1710 public ClearCompactionQueuesResponse clearCompactionQueues(RpcController controller, 1711 ClearCompactionQueuesRequest request) throws ServiceException { 1712 LOG.debug("Client=" + RpcServer.getRequestUserName().orElse(null) + "/" 1713 + RpcServer.getRemoteAddress().orElse(null) + " clear compactions queue"); 1714 ClearCompactionQueuesResponse.Builder respBuilder = ClearCompactionQueuesResponse.newBuilder(); 1715 requestCount.increment(); 1716 if (clearCompactionQueues.compareAndSet(false, true)) { 1717 final CompactSplit compactSplitThread = server.getCompactSplitThread(); 1718 try { 1719 checkOpen(); 1720 server.getRegionServerCoprocessorHost().preClearCompactionQueues(); 1721 for (String queueName : request.getQueueNameList()) { 1722 LOG.debug("clear " + queueName + " compaction queue"); 1723 switch (queueName) { 1724 case "long": 1725 compactSplitThread.clearLongCompactionsQueue(); 1726 break; 1727 case "short": 1728 compactSplitThread.clearShortCompactionsQueue(); 1729 break; 1730 default: 1731 LOG.warn("Unknown queue name " + queueName); 1732 throw new IOException("Unknown queue name " + queueName); 1733 } 1734 } 1735 server.getRegionServerCoprocessorHost().postClearCompactionQueues(); 1736 } catch (IOException ie) { 1737 throw new ServiceException(ie); 1738 } finally { 1739 clearCompactionQueues.set(false); 1740 } 1741 } else { 1742 LOG.warn("Clear compactions queue is executing by other admin."); 1743 } 1744 return respBuilder.build(); 1745 } 1746 1747 /** 1748 * Get some information of the region server. 1749 * @param controller the RPC controller 1750 * @param request the request 1751 */ 1752 @Override 1753 @QosPriority(priority = HConstants.ADMIN_QOS) 1754 public GetServerInfoResponse getServerInfo(final RpcController controller, 1755 final GetServerInfoRequest request) throws ServiceException { 1756 try { 1757 checkOpen(); 1758 } catch (IOException ie) { 1759 throw new ServiceException(ie); 1760 } 1761 requestCount.increment(); 1762 int infoPort = server.getInfoServer() != null ? server.getInfoServer().getPort() : -1; 1763 return ResponseConverter.buildGetServerInfoResponse(server.getServerName(), infoPort); 1764 } 1765 1766 @Override 1767 @QosPriority(priority = HConstants.ADMIN_QOS) 1768 public GetStoreFileResponse getStoreFile(final RpcController controller, 1769 final GetStoreFileRequest request) throws ServiceException { 1770 try { 1771 checkOpen(); 1772 HRegion region = getRegion(request.getRegion()); 1773 requestCount.increment(); 1774 Set<byte[]> columnFamilies; 1775 if (request.getFamilyCount() == 0) { 1776 columnFamilies = region.getTableDescriptor().getColumnFamilyNames(); 1777 } else { 1778 columnFamilies = new TreeSet<>(Bytes.BYTES_RAWCOMPARATOR); 1779 for (ByteString cf : request.getFamilyList()) { 1780 columnFamilies.add(cf.toByteArray()); 1781 } 1782 } 1783 int nCF = columnFamilies.size(); 1784 List<String> fileList = region.getStoreFileList(columnFamilies.toArray(new byte[nCF][])); 1785 GetStoreFileResponse.Builder builder = GetStoreFileResponse.newBuilder(); 1786 builder.addAllStoreFile(fileList); 1787 return builder.build(); 1788 } catch (IOException ie) { 1789 throw new ServiceException(ie); 1790 } 1791 } 1792 1793 private void throwOnWrongStartCode(OpenRegionRequest request) throws ServiceException { 1794 if (!request.hasServerStartCode()) { 1795 LOG.warn("OpenRegionRequest for {} does not have a start code", request.getOpenInfoList()); 1796 return; 1797 } 1798 throwOnWrongStartCode(request.getServerStartCode()); 1799 } 1800 1801 private void throwOnWrongStartCode(CloseRegionRequest request) throws ServiceException { 1802 if (!request.hasServerStartCode()) { 1803 LOG.warn("CloseRegionRequest for {} does not have a start code", request.getRegion()); 1804 return; 1805 } 1806 throwOnWrongStartCode(request.getServerStartCode()); 1807 } 1808 1809 private void throwOnWrongStartCode(long serverStartCode) throws ServiceException { 1810 // check that we are the same server that this RPC is intended for. 1811 if (server.getServerName().getStartcode() != serverStartCode) { 1812 throw new ServiceException(new DoNotRetryIOException( 1813 "This RPC was intended for a " + "different server with startCode: " + serverStartCode 1814 + ", this server is: " + server.getServerName())); 1815 } 1816 } 1817 1818 private void throwOnWrongStartCode(ExecuteProceduresRequest req) throws ServiceException { 1819 if (req.getOpenRegionCount() > 0) { 1820 for (OpenRegionRequest openReq : req.getOpenRegionList()) { 1821 throwOnWrongStartCode(openReq); 1822 } 1823 } 1824 if (req.getCloseRegionCount() > 0) { 1825 for (CloseRegionRequest closeReq : req.getCloseRegionList()) { 1826 throwOnWrongStartCode(closeReq); 1827 } 1828 } 1829 } 1830 1831 /** 1832 * Open asynchronously a region or a set of regions on the region server. The opening is 1833 * coordinated by ZooKeeper, and this method requires the znode to be created before being called. 1834 * As a consequence, this method should be called only from the master. 1835 * <p> 1836 * Different manages states for the region are: 1837 * </p> 1838 * <ul> 1839 * <li>region not opened: the region opening will start asynchronously.</li> 1840 * <li>a close is already in progress: this is considered as an error.</li> 1841 * <li>an open is already in progress: this new open request will be ignored. This is important 1842 * because the Master can do multiple requests if it crashes.</li> 1843 * <li>the region is already opened: this new open request will be ignored.</li> 1844 * </ul> 1845 * <p> 1846 * Bulk assign: If there are more than 1 region to open, it will be considered as a bulk assign. 1847 * For a single region opening, errors are sent through a ServiceException. For bulk assign, 1848 * errors are put in the response as FAILED_OPENING. 1849 * </p> 1850 * @param controller the RPC controller 1851 * @param request the request 1852 */ 1853 @Override 1854 @QosPriority(priority = HConstants.ADMIN_QOS) 1855 public OpenRegionResponse openRegion(final RpcController controller, 1856 final OpenRegionRequest request) throws ServiceException { 1857 requestCount.increment(); 1858 throwOnWrongStartCode(request); 1859 1860 OpenRegionResponse.Builder builder = OpenRegionResponse.newBuilder(); 1861 final int regionCount = request.getOpenInfoCount(); 1862 final Map<TableName, TableDescriptor> htds = new HashMap<>(regionCount); 1863 final boolean isBulkAssign = regionCount > 1; 1864 try { 1865 checkOpen(); 1866 } catch (IOException ie) { 1867 TableName tableName = null; 1868 if (regionCount == 1) { 1869 org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionInfo ri = 1870 request.getOpenInfo(0).getRegion(); 1871 if (ri != null) { 1872 tableName = ProtobufUtil.toTableName(ri.getTableName()); 1873 } 1874 } 1875 if (!TableName.META_TABLE_NAME.equals(tableName)) { 1876 throw new ServiceException(ie); 1877 } 1878 // We are assigning meta, wait a little for regionserver to finish initialization. 1879 // Default to quarter of RPC timeout 1880 int timeout = server.getConfiguration().getInt(HConstants.HBASE_RPC_TIMEOUT_KEY, 1881 HConstants.DEFAULT_HBASE_RPC_TIMEOUT) >> 2; 1882 long endTime = EnvironmentEdgeManager.currentTime() + timeout; 1883 synchronized (server.online) { 1884 try { 1885 while ( 1886 EnvironmentEdgeManager.currentTime() <= endTime && !server.isStopped() 1887 && !server.isOnline() 1888 ) { 1889 server.online.wait(server.getMsgInterval()); 1890 } 1891 checkOpen(); 1892 } catch (InterruptedException t) { 1893 Thread.currentThread().interrupt(); 1894 throw new ServiceException(t); 1895 } catch (IOException e) { 1896 throw new ServiceException(e); 1897 } 1898 } 1899 } 1900 1901 long masterSystemTime = request.hasMasterSystemTime() ? request.getMasterSystemTime() : -1; 1902 1903 for (RegionOpenInfo regionOpenInfo : request.getOpenInfoList()) { 1904 final RegionInfo region = ProtobufUtil.toRegionInfo(regionOpenInfo.getRegion()); 1905 TableDescriptor htd; 1906 try { 1907 String encodedName = region.getEncodedName(); 1908 byte[] encodedNameBytes = region.getEncodedNameAsBytes(); 1909 final HRegion onlineRegion = server.getRegion(encodedName); 1910 if (onlineRegion != null) { 1911 // The region is already online. This should not happen any more. 1912 String error = "Received OPEN for the region:" + region.getRegionNameAsString() 1913 + ", which is already online"; 1914 LOG.warn(error); 1915 // server.abort(error); 1916 // throw new IOException(error); 1917 builder.addOpeningState(RegionOpeningState.OPENED); 1918 continue; 1919 } 1920 LOG.info("Open " + region.getRegionNameAsString()); 1921 1922 final Boolean previous = 1923 server.getRegionsInTransitionInRS().putIfAbsent(encodedNameBytes, Boolean.TRUE); 1924 1925 if (Boolean.FALSE.equals(previous)) { 1926 if (server.getRegion(encodedName) != null) { 1927 // There is a close in progress. This should not happen any more. 1928 String error = "Received OPEN for the region:" + region.getRegionNameAsString() 1929 + ", which we are already trying to CLOSE"; 1930 server.abort(error); 1931 throw new IOException(error); 1932 } 1933 server.getRegionsInTransitionInRS().put(encodedNameBytes, Boolean.TRUE); 1934 } 1935 1936 if (Boolean.TRUE.equals(previous)) { 1937 // An open is in progress. This is supported, but let's log this. 1938 LOG.info("Receiving OPEN for the region:" + region.getRegionNameAsString() 1939 + ", which we are already trying to OPEN" 1940 + " - ignoring this new request for this region."); 1941 } 1942 1943 // We are opening this region. If it moves back and forth for whatever reason, we don't 1944 // want to keep returning the stale moved record while we are opening/if we close again. 1945 server.removeFromMovedRegions(region.getEncodedName()); 1946 1947 if (previous == null || !previous.booleanValue()) { 1948 htd = htds.get(region.getTable()); 1949 if (htd == null) { 1950 htd = server.getTableDescriptors().get(region.getTable()); 1951 htds.put(region.getTable(), htd); 1952 } 1953 if (htd == null) { 1954 throw new IOException("Missing table descriptor for " + region.getEncodedName()); 1955 } 1956 // If there is no action in progress, we can submit a specific handler. 1957 // Need to pass the expected version in the constructor. 1958 if (server.getExecutorService() == null) { 1959 LOG.info("No executor executorService; skipping open request"); 1960 } else { 1961 if (region.isMetaRegion()) { 1962 server.getExecutorService() 1963 .submit(new OpenMetaHandler(server, server, region, htd, masterSystemTime)); 1964 } else { 1965 if (regionOpenInfo.getFavoredNodesCount() > 0) { 1966 server.updateRegionFavoredNodesMapping(region.getEncodedName(), 1967 regionOpenInfo.getFavoredNodesList()); 1968 } 1969 if (htd.getPriority() >= HConstants.ADMIN_QOS || region.getTable().isSystemTable()) { 1970 server.getExecutorService().submit( 1971 new OpenPriorityRegionHandler(server, server, region, htd, masterSystemTime)); 1972 } else { 1973 server.getExecutorService() 1974 .submit(new OpenRegionHandler(server, server, region, htd, masterSystemTime)); 1975 } 1976 } 1977 } 1978 } 1979 1980 builder.addOpeningState(RegionOpeningState.OPENED); 1981 } catch (IOException ie) { 1982 LOG.warn("Failed opening region " + region.getRegionNameAsString(), ie); 1983 if (isBulkAssign) { 1984 builder.addOpeningState(RegionOpeningState.FAILED_OPENING); 1985 } else { 1986 throw new ServiceException(ie); 1987 } 1988 } 1989 } 1990 return builder.build(); 1991 } 1992 1993 /** 1994 * Warmup a region on this server. This method should only be called by Master. It synchronously 1995 * opens the region and closes the region bringing the most important pages in cache. 1996 */ 1997 @Override 1998 public WarmupRegionResponse warmupRegion(final RpcController controller, 1999 final WarmupRegionRequest request) throws ServiceException { 2000 final RegionInfo region = ProtobufUtil.toRegionInfo(request.getRegionInfo()); 2001 WarmupRegionResponse response = WarmupRegionResponse.getDefaultInstance(); 2002 try { 2003 checkOpen(); 2004 String encodedName = region.getEncodedName(); 2005 byte[] encodedNameBytes = region.getEncodedNameAsBytes(); 2006 final HRegion onlineRegion = server.getRegion(encodedName); 2007 if (onlineRegion != null) { 2008 LOG.info("{} is online; skipping warmup", region); 2009 return response; 2010 } 2011 TableDescriptor htd = server.getTableDescriptors().get(region.getTable()); 2012 if (server.getRegionsInTransitionInRS().containsKey(encodedNameBytes)) { 2013 LOG.info("{} is in transition; skipping warmup", region); 2014 return response; 2015 } 2016 LOG.info("Warmup {}", region.getRegionNameAsString()); 2017 HRegion.warmupHRegion(region, htd, server.getWAL(region), server.getConfiguration(), server, 2018 null); 2019 } catch (IOException ie) { 2020 LOG.error("Failed warmup of {}", region.getRegionNameAsString(), ie); 2021 throw new ServiceException(ie); 2022 } 2023 2024 return response; 2025 } 2026 2027 private CellScanner getAndReset(RpcController controller) { 2028 HBaseRpcController hrc = (HBaseRpcController) controller; 2029 CellScanner cells = hrc.cellScanner(); 2030 hrc.setCellScanner(null); 2031 return cells; 2032 } 2033 2034 /** 2035 * Replay the given changes when distributedLogReplay WAL edits from a failed RS. The guarantee is 2036 * that the given mutations will be durable on the receiving RS if this method returns without any 2037 * exception. 2038 * @param controller the RPC controller 2039 * @param request the request 2040 * @deprecated Since 3.0.0, will be removed in 4.0.0. Not used any more, put here only for 2041 * compatibility with old region replica implementation. Now we will use 2042 * {@code replicateToReplica} method instead. 2043 */ 2044 @Deprecated 2045 @Override 2046 @QosPriority(priority = HConstants.REPLAY_QOS) 2047 public ReplicateWALEntryResponse replay(final RpcController controller, 2048 final ReplicateWALEntryRequest request) throws ServiceException { 2049 long before = EnvironmentEdgeManager.currentTime(); 2050 CellScanner cells = getAndReset(controller); 2051 try { 2052 checkOpen(); 2053 List<WALEntry> entries = request.getEntryList(); 2054 if (entries == null || entries.isEmpty()) { 2055 // empty input 2056 return ReplicateWALEntryResponse.newBuilder().build(); 2057 } 2058 ByteString regionName = entries.get(0).getKey().getEncodedRegionName(); 2059 HRegion region = server.getRegionByEncodedName(regionName.toStringUtf8()); 2060 RegionCoprocessorHost coprocessorHost = 2061 ServerRegionReplicaUtil.isDefaultReplica(region.getRegionInfo()) 2062 ? region.getCoprocessorHost() 2063 : null; // do not invoke coprocessors if this is a secondary region replica 2064 List<Pair<WALKey, WALEdit>> walEntries = new ArrayList<>(); 2065 2066 // Skip adding the edits to WAL if this is a secondary region replica 2067 boolean isPrimary = RegionReplicaUtil.isDefaultReplica(region.getRegionInfo()); 2068 Durability durability = isPrimary ? Durability.USE_DEFAULT : Durability.SKIP_WAL; 2069 2070 for (WALEntry entry : entries) { 2071 if (!regionName.equals(entry.getKey().getEncodedRegionName())) { 2072 throw new NotServingRegionException("Replay request contains entries from multiple " 2073 + "regions. First region:" + regionName.toStringUtf8() + " , other region:" 2074 + entry.getKey().getEncodedRegionName()); 2075 } 2076 if (server.nonceManager != null && isPrimary) { 2077 long nonceGroup = 2078 entry.getKey().hasNonceGroup() ? entry.getKey().getNonceGroup() : HConstants.NO_NONCE; 2079 long nonce = entry.getKey().hasNonce() ? entry.getKey().getNonce() : HConstants.NO_NONCE; 2080 server.nonceManager.reportOperationFromWal(nonceGroup, nonce, 2081 entry.getKey().getWriteTime()); 2082 } 2083 Pair<WALKey, WALEdit> walEntry = (coprocessorHost == null) ? null : new Pair<>(); 2084 List<MutationReplay> edits = 2085 WALSplitUtil.getMutationsFromWALEntry(entry, cells, walEntry, durability); 2086 if (coprocessorHost != null) { 2087 // Start coprocessor replay here. The coprocessor is for each WALEdit instead of a 2088 // KeyValue. 2089 if ( 2090 coprocessorHost.preWALRestore(region.getRegionInfo(), walEntry.getFirst(), 2091 walEntry.getSecond()) 2092 ) { 2093 // if bypass this log entry, ignore it ... 2094 continue; 2095 } 2096 walEntries.add(walEntry); 2097 } 2098 if (edits != null && !edits.isEmpty()) { 2099 // HBASE-17924 2100 // sort to improve lock efficiency 2101 Collections.sort(edits, (v1, v2) -> Row.COMPARATOR.compare(v1.mutation, v2.mutation)); 2102 long replaySeqId = (entry.getKey().hasOrigSequenceNumber()) 2103 ? entry.getKey().getOrigSequenceNumber() 2104 : entry.getKey().getLogSequenceNumber(); 2105 OperationStatus[] result = doReplayBatchOp(region, edits, replaySeqId); 2106 // check if it's a partial success 2107 for (int i = 0; result != null && i < result.length; i++) { 2108 if (result[i] != OperationStatus.SUCCESS) { 2109 throw new IOException(result[i].getExceptionMsg()); 2110 } 2111 } 2112 } 2113 } 2114 2115 // sync wal at the end because ASYNC_WAL is used above 2116 WAL wal = region.getWAL(); 2117 if (wal != null) { 2118 wal.sync(); 2119 } 2120 2121 if (coprocessorHost != null) { 2122 for (Pair<WALKey, WALEdit> entry : walEntries) { 2123 coprocessorHost.postWALRestore(region.getRegionInfo(), entry.getFirst(), 2124 entry.getSecond()); 2125 } 2126 } 2127 return ReplicateWALEntryResponse.newBuilder().build(); 2128 } catch (IOException ie) { 2129 throw new ServiceException(ie); 2130 } finally { 2131 final MetricsRegionServer metricsRegionServer = server.getMetrics(); 2132 if (metricsRegionServer != null) { 2133 metricsRegionServer.updateReplay(EnvironmentEdgeManager.currentTime() - before); 2134 } 2135 } 2136 } 2137 2138 /** 2139 * Replay the given changes on a secondary replica 2140 */ 2141 @Override 2142 public ReplicateWALEntryResponse replicateToReplica(RpcController controller, 2143 ReplicateWALEntryRequest request) throws ServiceException { 2144 CellScanner cells = getAndReset(controller); 2145 try { 2146 checkOpen(); 2147 List<WALEntry> entries = request.getEntryList(); 2148 if (entries == null || entries.isEmpty()) { 2149 // empty input 2150 return ReplicateWALEntryResponse.newBuilder().build(); 2151 } 2152 ByteString regionName = entries.get(0).getKey().getEncodedRegionName(); 2153 HRegion region = server.getRegionByEncodedName(regionName.toStringUtf8()); 2154 if (RegionReplicaUtil.isDefaultReplica(region.getRegionInfo())) { 2155 throw new DoNotRetryIOException( 2156 "Should not replicate to primary replica " + region.getRegionInfo() + ", CODE BUG?"); 2157 } 2158 for (WALEntry entry : entries) { 2159 if (!regionName.equals(entry.getKey().getEncodedRegionName())) { 2160 throw new NotServingRegionException( 2161 "ReplicateToReplica request contains entries from multiple " + "regions. First region:" 2162 + regionName.toStringUtf8() + " , other region:" 2163 + entry.getKey().getEncodedRegionName()); 2164 } 2165 region.replayWALEntry(entry, cells); 2166 } 2167 return ReplicateWALEntryResponse.newBuilder().build(); 2168 } catch (IOException ie) { 2169 throw new ServiceException(ie); 2170 } 2171 } 2172 2173 private void checkShouldRejectReplicationRequest(List<WALEntry> entries) throws IOException { 2174 ReplicationSourceService replicationSource = server.getReplicationSourceService(); 2175 if (replicationSource == null || entries.isEmpty()) { 2176 return; 2177 } 2178 // We can ensure that all entries are for one peer, so only need to check one entry's 2179 // table name. if the table hit sync replication at peer side and the peer cluster 2180 // is (or is transiting to) state ACTIVE or DOWNGRADE_ACTIVE, we should reject to apply 2181 // those entries according to the design doc. 2182 TableName table = TableName.valueOf(entries.get(0).getKey().getTableName().toByteArray()); 2183 if ( 2184 replicationSource.getSyncReplicationPeerInfoProvider().checkState(table, 2185 RejectReplicationRequestStateChecker.get()) 2186 ) { 2187 throw new DoNotRetryIOException( 2188 "Reject to apply to sink cluster because sync replication state of sink cluster " 2189 + "is ACTIVE or DOWNGRADE_ACTIVE, table: " + table); 2190 } 2191 } 2192 2193 /** 2194 * Replicate WAL entries on the region server. 2195 * @param controller the RPC controller 2196 * @param request the request 2197 */ 2198 @Override 2199 @QosPriority(priority = HConstants.REPLICATION_QOS) 2200 public ReplicateWALEntryResponse replicateWALEntry(final RpcController controller, 2201 final ReplicateWALEntryRequest request) throws ServiceException { 2202 try { 2203 checkOpen(); 2204 if (server.getReplicationSinkService() != null) { 2205 requestCount.increment(); 2206 List<WALEntry> entries = request.getEntryList(); 2207 checkShouldRejectReplicationRequest(entries); 2208 CellScanner cellScanner = getAndReset(controller); 2209 server.getRegionServerCoprocessorHost().preReplicateLogEntries(); 2210 server.getReplicationSinkService().replicateLogEntries(entries, cellScanner, 2211 request.getReplicationClusterId(), request.getSourceBaseNamespaceDirPath(), 2212 request.getSourceHFileArchiveDirPath()); 2213 server.getRegionServerCoprocessorHost().postReplicateLogEntries(); 2214 return ReplicateWALEntryResponse.newBuilder().build(); 2215 } else { 2216 throw new ServiceException("Replication services are not initialized yet"); 2217 } 2218 } catch (IOException ie) { 2219 throw new ServiceException(ie); 2220 } 2221 } 2222 2223 /** 2224 * Roll the WAL writer of the region server. 2225 * @param controller the RPC controller 2226 * @param request the request 2227 */ 2228 @Override 2229 public RollWALWriterResponse rollWALWriter(final RpcController controller, 2230 final RollWALWriterRequest request) throws ServiceException { 2231 try { 2232 checkOpen(); 2233 requestCount.increment(); 2234 server.getRegionServerCoprocessorHost().preRollWALWriterRequest(); 2235 server.getWalRoller().requestRollAll(); 2236 server.getRegionServerCoprocessorHost().postRollWALWriterRequest(); 2237 RollWALWriterResponse.Builder builder = RollWALWriterResponse.newBuilder(); 2238 return builder.build(); 2239 } catch (IOException ie) { 2240 throw new ServiceException(ie); 2241 } 2242 } 2243 2244 /** 2245 * Stop the region server. 2246 * @param controller the RPC controller 2247 * @param request the request 2248 */ 2249 @Override 2250 @QosPriority(priority = HConstants.ADMIN_QOS) 2251 public StopServerResponse stopServer(final RpcController controller, 2252 final StopServerRequest request) throws ServiceException { 2253 rpcPreCheck("stopServer"); 2254 requestCount.increment(); 2255 String reason = request.getReason(); 2256 server.stop(reason); 2257 return StopServerResponse.newBuilder().build(); 2258 } 2259 2260 @Override 2261 public UpdateFavoredNodesResponse updateFavoredNodes(RpcController controller, 2262 UpdateFavoredNodesRequest request) throws ServiceException { 2263 rpcPreCheck("updateFavoredNodes"); 2264 List<UpdateFavoredNodesRequest.RegionUpdateInfo> openInfoList = request.getUpdateInfoList(); 2265 UpdateFavoredNodesResponse.Builder respBuilder = UpdateFavoredNodesResponse.newBuilder(); 2266 for (UpdateFavoredNodesRequest.RegionUpdateInfo regionUpdateInfo : openInfoList) { 2267 RegionInfo hri = ProtobufUtil.toRegionInfo(regionUpdateInfo.getRegion()); 2268 if (regionUpdateInfo.getFavoredNodesCount() > 0) { 2269 server.updateRegionFavoredNodesMapping(hri.getEncodedName(), 2270 regionUpdateInfo.getFavoredNodesList()); 2271 } 2272 } 2273 respBuilder.setResponse(openInfoList.size()); 2274 return respBuilder.build(); 2275 } 2276 2277 /** 2278 * Atomically bulk load several HFiles into an open region 2279 * @return true if successful, false is failed but recoverably (no action) 2280 * @throws ServiceException if failed unrecoverably 2281 */ 2282 @Override 2283 public BulkLoadHFileResponse bulkLoadHFile(final RpcController controller, 2284 final BulkLoadHFileRequest request) throws ServiceException { 2285 long start = EnvironmentEdgeManager.currentTime(); 2286 List<String> clusterIds = new ArrayList<>(request.getClusterIdsList()); 2287 if (clusterIds.contains(this.server.getClusterId())) { 2288 return BulkLoadHFileResponse.newBuilder().setLoaded(true).build(); 2289 } else { 2290 clusterIds.add(this.server.getClusterId()); 2291 } 2292 try { 2293 checkOpen(); 2294 requestCount.increment(); 2295 HRegion region = getRegion(request.getRegion()); 2296 final boolean spaceQuotaEnabled = QuotaUtil.isQuotaEnabled(getConfiguration()); 2297 long sizeToBeLoaded = -1; 2298 2299 // Check to see if this bulk load would exceed the space quota for this table 2300 if (spaceQuotaEnabled) { 2301 ActivePolicyEnforcement activeSpaceQuotas = getSpaceQuotaManager().getActiveEnforcements(); 2302 SpaceViolationPolicyEnforcement enforcement = 2303 activeSpaceQuotas.getPolicyEnforcement(region); 2304 if (enforcement != null) { 2305 // Bulk loads must still be atomic. We must enact all or none. 2306 List<String> filePaths = new ArrayList<>(request.getFamilyPathCount()); 2307 for (FamilyPath familyPath : request.getFamilyPathList()) { 2308 filePaths.add(familyPath.getPath()); 2309 } 2310 // Check if the batch of files exceeds the current quota 2311 sizeToBeLoaded = enforcement.computeBulkLoadSize(getFileSystem(filePaths), filePaths); 2312 } 2313 } 2314 // secure bulk load 2315 Map<byte[], List<Path>> map = 2316 server.getSecureBulkLoadManager().secureBulkLoadHFiles(region, request, clusterIds); 2317 BulkLoadHFileResponse.Builder builder = BulkLoadHFileResponse.newBuilder(); 2318 builder.setLoaded(map != null); 2319 if (map != null) { 2320 // Treat any negative size as a flag to "ignore" updating the region size as that is 2321 // not possible to occur in real life (cannot bulk load a file with negative size) 2322 if (spaceQuotaEnabled && sizeToBeLoaded > 0) { 2323 if (LOG.isTraceEnabled()) { 2324 LOG.trace("Incrementing space use of " + region.getRegionInfo() + " by " 2325 + sizeToBeLoaded + " bytes"); 2326 } 2327 // Inform space quotas of the new files for this region 2328 getSpaceQuotaManager().getRegionSizeStore().incrementRegionSize(region.getRegionInfo(), 2329 sizeToBeLoaded); 2330 } 2331 } 2332 return builder.build(); 2333 } catch (IOException ie) { 2334 throw new ServiceException(ie); 2335 } finally { 2336 final MetricsRegionServer metricsRegionServer = server.getMetrics(); 2337 if (metricsRegionServer != null) { 2338 metricsRegionServer.updateBulkLoad(EnvironmentEdgeManager.currentTime() - start); 2339 } 2340 } 2341 } 2342 2343 @Override 2344 public PrepareBulkLoadResponse prepareBulkLoad(RpcController controller, 2345 PrepareBulkLoadRequest request) throws ServiceException { 2346 try { 2347 checkOpen(); 2348 requestCount.increment(); 2349 2350 HRegion region = getRegion(request.getRegion()); 2351 2352 String bulkToken = server.getSecureBulkLoadManager().prepareBulkLoad(region, request); 2353 PrepareBulkLoadResponse.Builder builder = PrepareBulkLoadResponse.newBuilder(); 2354 builder.setBulkToken(bulkToken); 2355 return builder.build(); 2356 } catch (IOException ie) { 2357 throw new ServiceException(ie); 2358 } 2359 } 2360 2361 @Override 2362 public CleanupBulkLoadResponse cleanupBulkLoad(RpcController controller, 2363 CleanupBulkLoadRequest request) throws ServiceException { 2364 try { 2365 checkOpen(); 2366 requestCount.increment(); 2367 2368 HRegion region = getRegion(request.getRegion()); 2369 2370 server.getSecureBulkLoadManager().cleanupBulkLoad(region, request); 2371 return CleanupBulkLoadResponse.newBuilder().build(); 2372 } catch (IOException ie) { 2373 throw new ServiceException(ie); 2374 } 2375 } 2376 2377 @Override 2378 public CoprocessorServiceResponse execService(final RpcController controller, 2379 final CoprocessorServiceRequest request) throws ServiceException { 2380 try { 2381 checkOpen(); 2382 requestCount.increment(); 2383 HRegion region = getRegion(request.getRegion()); 2384 Message result = execServiceOnRegion(region, request.getCall()); 2385 CoprocessorServiceResponse.Builder builder = CoprocessorServiceResponse.newBuilder(); 2386 builder.setRegion(RequestConverter.buildRegionSpecifier(RegionSpecifierType.REGION_NAME, 2387 region.getRegionInfo().getRegionName())); 2388 // TODO: COPIES!!!!!! 2389 builder.setValue(builder.getValueBuilder().setName(result.getClass().getName()).setValue( 2390 org.apache.hbase.thirdparty.com.google.protobuf.ByteString.copyFrom(result.toByteArray()))); 2391 return builder.build(); 2392 } catch (IOException ie) { 2393 throw new ServiceException(ie); 2394 } 2395 } 2396 2397 private FileSystem getFileSystem(List<String> filePaths) throws IOException { 2398 if (filePaths.isEmpty()) { 2399 // local hdfs 2400 return server.getFileSystem(); 2401 } 2402 // source hdfs 2403 return new Path(filePaths.get(0)).getFileSystem(server.getConfiguration()); 2404 } 2405 2406 private Message execServiceOnRegion(HRegion region, 2407 final ClientProtos.CoprocessorServiceCall serviceCall) throws IOException { 2408 // ignore the passed in controller (from the serialized call) 2409 ServerRpcController execController = new ServerRpcController(); 2410 return region.execService(execController, serviceCall); 2411 } 2412 2413 private boolean shouldRejectRequestsFromClient(HRegion region) { 2414 TableName table = region.getRegionInfo().getTable(); 2415 ReplicationSourceService service = server.getReplicationSourceService(); 2416 return service != null && service.getSyncReplicationPeerInfoProvider().checkState(table, 2417 RejectRequestsFromClientStateChecker.get()); 2418 } 2419 2420 private void rejectIfInStandByState(HRegion region) throws DoNotRetryIOException { 2421 if (shouldRejectRequestsFromClient(region)) { 2422 throw new DoNotRetryIOException( 2423 region.getRegionInfo().getRegionNameAsString() + " is in STANDBY state."); 2424 } 2425 } 2426 2427 /** 2428 * Get data from a table. 2429 * @param controller the RPC controller 2430 * @param request the get request 2431 */ 2432 @Override 2433 public GetResponse get(final RpcController controller, final GetRequest request) 2434 throws ServiceException { 2435 long before = EnvironmentEdgeManager.currentTime(); 2436 OperationQuota quota = null; 2437 HRegion region = null; 2438 RpcCallContext context = RpcServer.getCurrentCall().orElse(null); 2439 try { 2440 checkOpen(); 2441 requestCount.increment(); 2442 rpcGetRequestCount.increment(); 2443 region = getRegion(request.getRegion()); 2444 rejectIfInStandByState(region); 2445 2446 GetResponse.Builder builder = GetResponse.newBuilder(); 2447 ClientProtos.Get get = request.getGet(); 2448 // An asynchbase client, https://github.com/OpenTSDB/asynchbase, starts by trying to do 2449 // a get closest before. Throwing the UnknownProtocolException signals it that it needs 2450 // to switch and do hbase2 protocol (HBase servers do not tell clients what versions 2451 // they are; its a problem for non-native clients like asynchbase. HBASE-20225. 2452 if (get.hasClosestRowBefore() && get.getClosestRowBefore()) { 2453 throw new UnknownProtocolException("Is this a pre-hbase-1.0.0 or asynchbase client? " 2454 + "Client is invoking getClosestRowBefore removed in hbase-2.0.0 replaced by " 2455 + "reverse Scan."); 2456 } 2457 Boolean existence = null; 2458 Result r = null; 2459 quota = getRpcQuotaManager().checkQuota(region, OperationQuota.OperationType.GET); 2460 2461 Get clientGet = ProtobufUtil.toGet(get); 2462 if (get.getExistenceOnly() && region.getCoprocessorHost() != null) { 2463 existence = region.getCoprocessorHost().preExists(clientGet); 2464 } 2465 if (existence == null) { 2466 if (context != null) { 2467 r = get(clientGet, (region), null, context); 2468 } else { 2469 // for test purpose 2470 r = region.get(clientGet); 2471 } 2472 if (get.getExistenceOnly()) { 2473 boolean exists = r.getExists(); 2474 if (region.getCoprocessorHost() != null) { 2475 exists = region.getCoprocessorHost().postExists(clientGet, exists); 2476 } 2477 existence = exists; 2478 } 2479 } 2480 if (existence != null) { 2481 ClientProtos.Result pbr = 2482 ProtobufUtil.toResult(existence, region.getRegionInfo().getReplicaId() != 0); 2483 builder.setResult(pbr); 2484 } else if (r != null) { 2485 ClientProtos.Result pbr; 2486 if ( 2487 isClientCellBlockSupport(context) && controller instanceof HBaseRpcController 2488 && VersionInfoUtil.hasMinimumVersion(context.getClientVersionInfo(), 1, 3) 2489 ) { 2490 pbr = ProtobufUtil.toResultNoData(r); 2491 ((HBaseRpcController) controller) 2492 .setCellScanner(CellUtil.createCellScanner(r.rawCells())); 2493 addSize(context, r); 2494 } else { 2495 pbr = ProtobufUtil.toResult(r); 2496 } 2497 builder.setResult(pbr); 2498 } 2499 // r.cells is null when an table.exists(get) call 2500 if (r != null && r.rawCells() != null) { 2501 quota.addGetResult(r); 2502 } 2503 return builder.build(); 2504 } catch (IOException ie) { 2505 throw new ServiceException(ie); 2506 } finally { 2507 final MetricsRegionServer metricsRegionServer = server.getMetrics(); 2508 if (metricsRegionServer != null && region != null) { 2509 long blockBytesScanned = context != null ? context.getBlockBytesScanned() : 0; 2510 metricsRegionServer.updateGet(region, EnvironmentEdgeManager.currentTime() - before, 2511 blockBytesScanned); 2512 } 2513 if (quota != null) { 2514 quota.close(); 2515 } 2516 } 2517 } 2518 2519 private Result get(Get get, HRegion region, RegionScannersCloseCallBack closeCallBack, 2520 RpcCallContext context) throws IOException { 2521 region.prepareGet(get); 2522 boolean stale = region.getRegionInfo().getReplicaId() != 0; 2523 2524 // This method is almost the same as HRegion#get. 2525 List<Cell> results = new ArrayList<>(); 2526 long before = EnvironmentEdgeManager.currentTime(); 2527 // pre-get CP hook 2528 if (region.getCoprocessorHost() != null) { 2529 if (region.getCoprocessorHost().preGet(get, results)) { 2530 region.metricsUpdateForGet(results, before); 2531 return Result.create(results, get.isCheckExistenceOnly() ? !results.isEmpty() : null, 2532 stale); 2533 } 2534 } 2535 Scan scan = new Scan(get); 2536 if (scan.getLoadColumnFamiliesOnDemandValue() == null) { 2537 scan.setLoadColumnFamiliesOnDemand(region.isLoadingCfsOnDemandDefault()); 2538 } 2539 RegionScannerImpl scanner = null; 2540 try { 2541 scanner = region.getScanner(scan); 2542 scanner.next(results); 2543 } finally { 2544 if (scanner != null) { 2545 if (closeCallBack == null) { 2546 // If there is a context then the scanner can be added to the current 2547 // RpcCallContext. The rpc callback will take care of closing the 2548 // scanner, for eg in case 2549 // of get() 2550 context.setCallBack(scanner); 2551 } else { 2552 // The call is from multi() where the results from the get() are 2553 // aggregated and then send out to the 2554 // rpc. The rpccall back will close all such scanners created as part 2555 // of multi(). 2556 closeCallBack.addScanner(scanner); 2557 } 2558 } 2559 } 2560 2561 // post-get CP hook 2562 if (region.getCoprocessorHost() != null) { 2563 region.getCoprocessorHost().postGet(get, results); 2564 } 2565 region.metricsUpdateForGet(results, before); 2566 2567 return Result.create(results, get.isCheckExistenceOnly() ? !results.isEmpty() : null, stale); 2568 } 2569 2570 private void checkBatchSizeAndLogLargeSize(MultiRequest request) throws ServiceException { 2571 int sum = 0; 2572 String firstRegionName = null; 2573 for (RegionAction regionAction : request.getRegionActionList()) { 2574 if (sum == 0) { 2575 firstRegionName = Bytes.toStringBinary(regionAction.getRegion().getValue().toByteArray()); 2576 } 2577 sum += regionAction.getActionCount(); 2578 } 2579 if (sum > rowSizeWarnThreshold) { 2580 LOG.warn("Large batch operation detected (greater than " + rowSizeWarnThreshold 2581 + ") (HBASE-18023)." + " Requested Number of Rows: " + sum + " Client: " 2582 + RpcServer.getRequestUserName().orElse(null) + "/" 2583 + RpcServer.getRemoteAddress().orElse(null) + " first region in multi=" + firstRegionName); 2584 if (rejectRowsWithSizeOverThreshold) { 2585 throw new ServiceException( 2586 "Rejecting large batch operation for current batch with firstRegionName: " 2587 + firstRegionName + " , Requested Number of Rows: " + sum + " , Size Threshold: " 2588 + rowSizeWarnThreshold); 2589 } 2590 } 2591 } 2592 2593 private void failRegionAction(MultiResponse.Builder responseBuilder, 2594 RegionActionResult.Builder regionActionResultBuilder, RegionAction regionAction, 2595 CellScanner cellScanner, Throwable error) { 2596 rpcServer.getMetrics().exception(error); 2597 regionActionResultBuilder.setException(ResponseConverter.buildException(error)); 2598 responseBuilder.addRegionActionResult(regionActionResultBuilder.build()); 2599 // All Mutations in this RegionAction not executed as we can not see the Region online here 2600 // in this RS. Will be retried from Client. Skipping all the Cells in CellScanner 2601 // corresponding to these Mutations. 2602 if (cellScanner != null) { 2603 skipCellsForMutations(regionAction.getActionList(), cellScanner); 2604 } 2605 } 2606 2607 private boolean isReplicationRequest(Action action) { 2608 // replication request can only be put or delete. 2609 if (!action.hasMutation()) { 2610 return false; 2611 } 2612 MutationProto mutation = action.getMutation(); 2613 MutationType type = mutation.getMutateType(); 2614 if (type != MutationType.PUT && type != MutationType.DELETE) { 2615 return false; 2616 } 2617 // replication will set a special attribute so we can make use of it to decide whether a request 2618 // is for replication. 2619 return mutation.getAttributeList().stream().map(p -> p.getName()) 2620 .filter(n -> n.equals(ReplicationUtils.REPLICATION_ATTR_NAME)).findAny().isPresent(); 2621 } 2622 2623 /** 2624 * Execute multiple actions on a table: get, mutate, and/or execCoprocessor 2625 * @param rpcc the RPC controller 2626 * @param request the multi request 2627 */ 2628 @Override 2629 public MultiResponse multi(final RpcController rpcc, final MultiRequest request) 2630 throws ServiceException { 2631 try { 2632 checkOpen(); 2633 } catch (IOException ie) { 2634 throw new ServiceException(ie); 2635 } 2636 2637 checkBatchSizeAndLogLargeSize(request); 2638 2639 // rpc controller is how we bring in data via the back door; it is unprotobuf'ed data. 2640 // It is also the conduit via which we pass back data. 2641 HBaseRpcController controller = (HBaseRpcController) rpcc; 2642 CellScanner cellScanner = controller != null ? controller.cellScanner() : null; 2643 if (controller != null) { 2644 controller.setCellScanner(null); 2645 } 2646 2647 long nonceGroup = request.hasNonceGroup() ? request.getNonceGroup() : HConstants.NO_NONCE; 2648 2649 MultiResponse.Builder responseBuilder = MultiResponse.newBuilder(); 2650 RegionActionResult.Builder regionActionResultBuilder = RegionActionResult.newBuilder(); 2651 this.rpcMultiRequestCount.increment(); 2652 this.requestCount.increment(); 2653 ActivePolicyEnforcement spaceQuotaEnforcement = getSpaceQuotaManager().getActiveEnforcements(); 2654 2655 // We no longer use MultiRequest#condition. Instead, we use RegionAction#condition. The 2656 // following logic is for backward compatibility as old clients still use 2657 // MultiRequest#condition in case of checkAndMutate with RowMutations. 2658 if (request.hasCondition()) { 2659 if (request.getRegionActionList().isEmpty()) { 2660 // If the region action list is empty, do nothing. 2661 responseBuilder.setProcessed(true); 2662 return responseBuilder.build(); 2663 } 2664 2665 RegionAction regionAction = request.getRegionAction(0); 2666 2667 // When request.hasCondition() is true, regionAction.getAtomic() should be always true. So 2668 // we can assume regionAction.getAtomic() is true here. 2669 assert regionAction.getAtomic(); 2670 2671 OperationQuota quota; 2672 HRegion region; 2673 RegionSpecifier regionSpecifier = regionAction.getRegion(); 2674 2675 try { 2676 region = getRegion(regionSpecifier); 2677 quota = getRpcQuotaManager().checkQuota(region, regionAction.getActionList()); 2678 } catch (IOException e) { 2679 failRegionAction(responseBuilder, regionActionResultBuilder, regionAction, cellScanner, e); 2680 return responseBuilder.build(); 2681 } 2682 2683 try { 2684 boolean rejectIfFromClient = shouldRejectRequestsFromClient(region); 2685 // We only allow replication in standby state and it will not set the atomic flag. 2686 if (rejectIfFromClient) { 2687 failRegionAction(responseBuilder, regionActionResultBuilder, regionAction, cellScanner, 2688 new DoNotRetryIOException( 2689 region.getRegionInfo().getRegionNameAsString() + " is in STANDBY state")); 2690 return responseBuilder.build(); 2691 } 2692 2693 try { 2694 CheckAndMutateResult result = checkAndMutate(region, regionAction.getActionList(), 2695 cellScanner, request.getCondition(), nonceGroup, spaceQuotaEnforcement); 2696 responseBuilder.setProcessed(result.isSuccess()); 2697 ClientProtos.ResultOrException.Builder resultOrExceptionOrBuilder = 2698 ClientProtos.ResultOrException.newBuilder(); 2699 for (int i = 0; i < regionAction.getActionCount(); i++) { 2700 // To unify the response format with doNonAtomicRegionMutation and read through 2701 // client's AsyncProcess we have to add an empty result instance per operation 2702 resultOrExceptionOrBuilder.clear(); 2703 resultOrExceptionOrBuilder.setIndex(i); 2704 regionActionResultBuilder.addResultOrException(resultOrExceptionOrBuilder.build()); 2705 } 2706 } catch (IOException e) { 2707 rpcServer.getMetrics().exception(e); 2708 // As it's an atomic operation with a condition, we may expect it's a global failure. 2709 regionActionResultBuilder.setException(ResponseConverter.buildException(e)); 2710 } 2711 } finally { 2712 quota.close(); 2713 } 2714 2715 responseBuilder.addRegionActionResult(regionActionResultBuilder.build()); 2716 ClientProtos.RegionLoadStats regionLoadStats = region.getLoadStatistics(); 2717 if (regionLoadStats != null) { 2718 responseBuilder.setRegionStatistics(MultiRegionLoadStats.newBuilder() 2719 .addRegion(regionSpecifier).addStat(regionLoadStats).build()); 2720 } 2721 return responseBuilder.build(); 2722 } 2723 2724 // this will contain all the cells that we need to return. It's created later, if needed. 2725 List<CellScannable> cellsToReturn = null; 2726 RegionScannersCloseCallBack closeCallBack = null; 2727 RpcCallContext context = RpcServer.getCurrentCall().orElse(null); 2728 Map<RegionSpecifier, ClientProtos.RegionLoadStats> regionStats = 2729 new HashMap<>(request.getRegionActionCount()); 2730 2731 for (RegionAction regionAction : request.getRegionActionList()) { 2732 OperationQuota quota; 2733 HRegion region; 2734 RegionSpecifier regionSpecifier = regionAction.getRegion(); 2735 regionActionResultBuilder.clear(); 2736 2737 try { 2738 region = getRegion(regionSpecifier); 2739 quota = getRpcQuotaManager().checkQuota(region, regionAction.getActionList()); 2740 } catch (IOException e) { 2741 failRegionAction(responseBuilder, regionActionResultBuilder, regionAction, cellScanner, e); 2742 continue; // For this region it's a failure. 2743 } 2744 2745 try { 2746 boolean rejectIfFromClient = shouldRejectRequestsFromClient(region); 2747 2748 if (regionAction.hasCondition()) { 2749 // We only allow replication in standby state and it will not set the atomic flag. 2750 if (rejectIfFromClient) { 2751 failRegionAction(responseBuilder, regionActionResultBuilder, regionAction, cellScanner, 2752 new DoNotRetryIOException( 2753 region.getRegionInfo().getRegionNameAsString() + " is in STANDBY state")); 2754 continue; 2755 } 2756 2757 try { 2758 ClientProtos.ResultOrException.Builder resultOrExceptionOrBuilder = 2759 ClientProtos.ResultOrException.newBuilder(); 2760 if (regionAction.getActionCount() == 1) { 2761 CheckAndMutateResult result = 2762 checkAndMutate(region, quota, regionAction.getAction(0).getMutation(), cellScanner, 2763 regionAction.getCondition(), nonceGroup, spaceQuotaEnforcement, context); 2764 regionActionResultBuilder.setProcessed(result.isSuccess()); 2765 resultOrExceptionOrBuilder.setIndex(0); 2766 if (result.getResult() != null) { 2767 resultOrExceptionOrBuilder.setResult(ProtobufUtil.toResult(result.getResult())); 2768 } 2769 regionActionResultBuilder.addResultOrException(resultOrExceptionOrBuilder.build()); 2770 } else { 2771 CheckAndMutateResult result = checkAndMutate(region, regionAction.getActionList(), 2772 cellScanner, regionAction.getCondition(), nonceGroup, spaceQuotaEnforcement); 2773 regionActionResultBuilder.setProcessed(result.isSuccess()); 2774 for (int i = 0; i < regionAction.getActionCount(); i++) { 2775 if (i == 0 && result.getResult() != null) { 2776 // Set the result of the Increment/Append operations to the first element of the 2777 // ResultOrException list 2778 resultOrExceptionOrBuilder.setIndex(i); 2779 regionActionResultBuilder.addResultOrException(resultOrExceptionOrBuilder 2780 .setResult(ProtobufUtil.toResult(result.getResult())).build()); 2781 continue; 2782 } 2783 // To unify the response format with doNonAtomicRegionMutation and read through 2784 // client's AsyncProcess we have to add an empty result instance per operation 2785 resultOrExceptionOrBuilder.clear(); 2786 resultOrExceptionOrBuilder.setIndex(i); 2787 regionActionResultBuilder.addResultOrException(resultOrExceptionOrBuilder.build()); 2788 } 2789 } 2790 } catch (IOException e) { 2791 rpcServer.getMetrics().exception(e); 2792 // As it's an atomic operation with a condition, we may expect it's a global failure. 2793 regionActionResultBuilder.setException(ResponseConverter.buildException(e)); 2794 } 2795 } else if (regionAction.hasAtomic() && regionAction.getAtomic()) { 2796 // We only allow replication in standby state and it will not set the atomic flag. 2797 if (rejectIfFromClient) { 2798 failRegionAction(responseBuilder, regionActionResultBuilder, regionAction, cellScanner, 2799 new DoNotRetryIOException( 2800 region.getRegionInfo().getRegionNameAsString() + " is in STANDBY state")); 2801 continue; 2802 } 2803 try { 2804 doAtomicBatchOp(regionActionResultBuilder, region, quota, regionAction.getActionList(), 2805 cellScanner, nonceGroup, spaceQuotaEnforcement); 2806 regionActionResultBuilder.setProcessed(true); 2807 // We no longer use MultiResponse#processed. Instead, we use 2808 // RegionActionResult#processed. This is for backward compatibility for old clients. 2809 responseBuilder.setProcessed(true); 2810 } catch (IOException e) { 2811 rpcServer.getMetrics().exception(e); 2812 // As it's atomic, we may expect it's a global failure. 2813 regionActionResultBuilder.setException(ResponseConverter.buildException(e)); 2814 } 2815 } else { 2816 if ( 2817 rejectIfFromClient && regionAction.getActionCount() > 0 2818 && !isReplicationRequest(regionAction.getAction(0)) 2819 ) { 2820 // fail if it is not a replication request 2821 failRegionAction(responseBuilder, regionActionResultBuilder, regionAction, cellScanner, 2822 new DoNotRetryIOException( 2823 region.getRegionInfo().getRegionNameAsString() + " is in STANDBY state")); 2824 continue; 2825 } 2826 // doNonAtomicRegionMutation manages the exception internally 2827 if (context != null && closeCallBack == null) { 2828 // An RpcCallBack that creates a list of scanners that needs to perform callBack 2829 // operation on completion of multiGets. 2830 // Set this only once 2831 closeCallBack = new RegionScannersCloseCallBack(); 2832 context.setCallBack(closeCallBack); 2833 } 2834 cellsToReturn = doNonAtomicRegionMutation(region, quota, regionAction, cellScanner, 2835 regionActionResultBuilder, cellsToReturn, nonceGroup, closeCallBack, context, 2836 spaceQuotaEnforcement); 2837 } 2838 } finally { 2839 quota.close(); 2840 } 2841 2842 responseBuilder.addRegionActionResult(regionActionResultBuilder.build()); 2843 ClientProtos.RegionLoadStats regionLoadStats = region.getLoadStatistics(); 2844 if (regionLoadStats != null) { 2845 regionStats.put(regionSpecifier, regionLoadStats); 2846 } 2847 } 2848 // Load the controller with the Cells to return. 2849 if (cellsToReturn != null && !cellsToReturn.isEmpty() && controller != null) { 2850 controller.setCellScanner(CellUtil.createCellScanner(cellsToReturn)); 2851 } 2852 2853 MultiRegionLoadStats.Builder builder = MultiRegionLoadStats.newBuilder(); 2854 for (Entry<RegionSpecifier, ClientProtos.RegionLoadStats> stat : regionStats.entrySet()) { 2855 builder.addRegion(stat.getKey()); 2856 builder.addStat(stat.getValue()); 2857 } 2858 responseBuilder.setRegionStatistics(builder); 2859 return responseBuilder.build(); 2860 } 2861 2862 private void skipCellsForMutations(List<Action> actions, CellScanner cellScanner) { 2863 if (cellScanner == null) { 2864 return; 2865 } 2866 for (Action action : actions) { 2867 skipCellsForMutation(action, cellScanner); 2868 } 2869 } 2870 2871 private void skipCellsForMutation(Action action, CellScanner cellScanner) { 2872 if (cellScanner == null) { 2873 return; 2874 } 2875 try { 2876 if (action.hasMutation()) { 2877 MutationProto m = action.getMutation(); 2878 if (m.hasAssociatedCellCount()) { 2879 for (int i = 0; i < m.getAssociatedCellCount(); i++) { 2880 cellScanner.advance(); 2881 } 2882 } 2883 } 2884 } catch (IOException e) { 2885 // No need to handle these Individual Muatation level issue. Any way this entire RegionAction 2886 // marked as failed as we could not see the Region here. At client side the top level 2887 // RegionAction exception will be considered first. 2888 LOG.error("Error while skipping Cells in CellScanner for invalid Region Mutations", e); 2889 } 2890 } 2891 2892 /** 2893 * Mutate data in a table. 2894 * @param rpcc the RPC controller 2895 * @param request the mutate request 2896 */ 2897 @Override 2898 public MutateResponse mutate(final RpcController rpcc, final MutateRequest request) 2899 throws ServiceException { 2900 // rpc controller is how we bring in data via the back door; it is unprotobuf'ed data. 2901 // It is also the conduit via which we pass back data. 2902 HBaseRpcController controller = (HBaseRpcController) rpcc; 2903 CellScanner cellScanner = controller != null ? controller.cellScanner() : null; 2904 OperationQuota quota = null; 2905 RpcCallContext context = RpcServer.getCurrentCall().orElse(null); 2906 // Clear scanner so we are not holding on to reference across call. 2907 if (controller != null) { 2908 controller.setCellScanner(null); 2909 } 2910 try { 2911 checkOpen(); 2912 requestCount.increment(); 2913 rpcMutateRequestCount.increment(); 2914 HRegion region = getRegion(request.getRegion()); 2915 rejectIfInStandByState(region); 2916 MutateResponse.Builder builder = MutateResponse.newBuilder(); 2917 MutationProto mutation = request.getMutation(); 2918 if (!region.getRegionInfo().isMetaRegion()) { 2919 server.getMemStoreFlusher().reclaimMemStoreMemory(); 2920 } 2921 long nonceGroup = request.hasNonceGroup() ? request.getNonceGroup() : HConstants.NO_NONCE; 2922 quota = getRpcQuotaManager().checkQuota(region, OperationQuota.OperationType.MUTATE); 2923 ActivePolicyEnforcement spaceQuotaEnforcement = 2924 getSpaceQuotaManager().getActiveEnforcements(); 2925 2926 if (request.hasCondition()) { 2927 CheckAndMutateResult result = checkAndMutate(region, quota, mutation, cellScanner, 2928 request.getCondition(), nonceGroup, spaceQuotaEnforcement, context); 2929 builder.setProcessed(result.isSuccess()); 2930 boolean clientCellBlockSupported = isClientCellBlockSupport(context); 2931 addResult(builder, result.getResult(), controller, clientCellBlockSupported); 2932 if (clientCellBlockSupported) { 2933 addSize(context, result.getResult()); 2934 } 2935 } else { 2936 Result r = null; 2937 Boolean processed = null; 2938 MutationType type = mutation.getMutateType(); 2939 switch (type) { 2940 case APPEND: 2941 // TODO: this doesn't actually check anything. 2942 r = append(region, quota, mutation, cellScanner, nonceGroup, spaceQuotaEnforcement, 2943 context); 2944 break; 2945 case INCREMENT: 2946 // TODO: this doesn't actually check anything. 2947 r = increment(region, quota, mutation, cellScanner, nonceGroup, spaceQuotaEnforcement, 2948 context); 2949 break; 2950 case PUT: 2951 put(region, quota, mutation, cellScanner, spaceQuotaEnforcement); 2952 processed = Boolean.TRUE; 2953 break; 2954 case DELETE: 2955 delete(region, quota, mutation, cellScanner, spaceQuotaEnforcement); 2956 processed = Boolean.TRUE; 2957 break; 2958 default: 2959 throw new DoNotRetryIOException("Unsupported mutate type: " + type.name()); 2960 } 2961 if (processed != null) { 2962 builder.setProcessed(processed); 2963 } 2964 boolean clientCellBlockSupported = isClientCellBlockSupport(context); 2965 addResult(builder, r, controller, clientCellBlockSupported); 2966 if (clientCellBlockSupported) { 2967 addSize(context, r); 2968 } 2969 } 2970 return builder.build(); 2971 } catch (IOException ie) { 2972 server.checkFileSystem(); 2973 throw new ServiceException(ie); 2974 } finally { 2975 if (quota != null) { 2976 quota.close(); 2977 } 2978 } 2979 } 2980 2981 private void put(HRegion region, OperationQuota quota, MutationProto mutation, 2982 CellScanner cellScanner, ActivePolicyEnforcement spaceQuota) throws IOException { 2983 long before = EnvironmentEdgeManager.currentTime(); 2984 Put put = ProtobufUtil.toPut(mutation, cellScanner); 2985 checkCellSizeLimit(region, put); 2986 spaceQuota.getPolicyEnforcement(region).check(put); 2987 quota.addMutation(put); 2988 region.put(put); 2989 2990 MetricsRegionServer metricsRegionServer = server.getMetrics(); 2991 if (metricsRegionServer != null) { 2992 long after = EnvironmentEdgeManager.currentTime(); 2993 metricsRegionServer.updatePut(region, after - before); 2994 } 2995 } 2996 2997 private void delete(HRegion region, OperationQuota quota, MutationProto mutation, 2998 CellScanner cellScanner, ActivePolicyEnforcement spaceQuota) throws IOException { 2999 long before = EnvironmentEdgeManager.currentTime(); 3000 Delete delete = ProtobufUtil.toDelete(mutation, cellScanner); 3001 checkCellSizeLimit(region, delete); 3002 spaceQuota.getPolicyEnforcement(region).check(delete); 3003 quota.addMutation(delete); 3004 region.delete(delete); 3005 3006 MetricsRegionServer metricsRegionServer = server.getMetrics(); 3007 if (metricsRegionServer != null) { 3008 long after = EnvironmentEdgeManager.currentTime(); 3009 metricsRegionServer.updateDelete(region, after - before); 3010 } 3011 } 3012 3013 private CheckAndMutateResult checkAndMutate(HRegion region, OperationQuota quota, 3014 MutationProto mutation, CellScanner cellScanner, Condition condition, long nonceGroup, 3015 ActivePolicyEnforcement spaceQuota, RpcCallContext context) throws IOException { 3016 long before = EnvironmentEdgeManager.currentTime(); 3017 long blockBytesScannedBefore = context != null ? context.getBlockBytesScanned() : 0; 3018 CheckAndMutate checkAndMutate = ProtobufUtil.toCheckAndMutate(condition, mutation, cellScanner); 3019 long nonce = mutation.hasNonce() ? mutation.getNonce() : HConstants.NO_NONCE; 3020 checkCellSizeLimit(region, (Mutation) checkAndMutate.getAction()); 3021 spaceQuota.getPolicyEnforcement(region).check((Mutation) checkAndMutate.getAction()); 3022 quota.addMutation((Mutation) checkAndMutate.getAction()); 3023 3024 CheckAndMutateResult result = null; 3025 if (region.getCoprocessorHost() != null) { 3026 result = region.getCoprocessorHost().preCheckAndMutate(checkAndMutate); 3027 } 3028 if (result == null) { 3029 result = region.checkAndMutate(checkAndMutate, nonceGroup, nonce); 3030 if (region.getCoprocessorHost() != null) { 3031 result = region.getCoprocessorHost().postCheckAndMutate(checkAndMutate, result); 3032 } 3033 } 3034 MetricsRegionServer metricsRegionServer = server.getMetrics(); 3035 if (metricsRegionServer != null) { 3036 long after = EnvironmentEdgeManager.currentTime(); 3037 long blockBytesScanned = 3038 context != null ? context.getBlockBytesScanned() - blockBytesScannedBefore : 0; 3039 metricsRegionServer.updateCheckAndMutate(region, after - before, blockBytesScanned); 3040 3041 MutationType type = mutation.getMutateType(); 3042 switch (type) { 3043 case PUT: 3044 metricsRegionServer.updateCheckAndPut(region, after - before); 3045 break; 3046 case DELETE: 3047 metricsRegionServer.updateCheckAndDelete(region, after - before); 3048 break; 3049 default: 3050 break; 3051 } 3052 } 3053 return result; 3054 } 3055 3056 // This is used to keep compatible with the old client implementation. Consider remove it if we 3057 // decide to drop the support of the client that still sends close request to a region scanner 3058 // which has already been exhausted. 3059 @Deprecated 3060 private static final IOException SCANNER_ALREADY_CLOSED = new IOException() { 3061 3062 private static final long serialVersionUID = -4305297078988180130L; 3063 3064 @Override 3065 public synchronized Throwable fillInStackTrace() { 3066 return this; 3067 } 3068 }; 3069 3070 private RegionScannerHolder getRegionScanner(ScanRequest request) throws IOException { 3071 String scannerName = toScannerName(request.getScannerId()); 3072 RegionScannerHolder rsh = this.scanners.get(scannerName); 3073 if (rsh == null) { 3074 // just ignore the next or close request if scanner does not exists. 3075 if (closedScanners.getIfPresent(scannerName) != null) { 3076 throw SCANNER_ALREADY_CLOSED; 3077 } else { 3078 LOG.warn("Client tried to access missing scanner " + scannerName); 3079 throw new UnknownScannerException( 3080 "Unknown scanner '" + scannerName + "'. This can happen due to any of the following " 3081 + "reasons: a) Scanner id given is wrong, b) Scanner lease expired because of " 3082 + "long wait between consecutive client checkins, c) Server may be closing down, " 3083 + "d) RegionServer restart during upgrade.\nIf the issue is due to reason (b), a " 3084 + "possible fix would be increasing the value of" 3085 + "'hbase.client.scanner.timeout.period' configuration."); 3086 } 3087 } 3088 rejectIfInStandByState(rsh.r); 3089 RegionInfo hri = rsh.s.getRegionInfo(); 3090 // Yes, should be the same instance 3091 if (server.getOnlineRegion(hri.getRegionName()) != rsh.r) { 3092 String msg = "Region has changed on the scanner " + scannerName + ": regionName=" 3093 + hri.getRegionNameAsString() + ", scannerRegionName=" + rsh.r; 3094 LOG.warn(msg + ", closing..."); 3095 scanners.remove(scannerName); 3096 try { 3097 rsh.s.close(); 3098 } catch (IOException e) { 3099 LOG.warn("Getting exception closing " + scannerName, e); 3100 } finally { 3101 try { 3102 server.getLeaseManager().cancelLease(scannerName); 3103 } catch (LeaseException e) { 3104 LOG.warn("Getting exception closing " + scannerName, e); 3105 } 3106 } 3107 throw new NotServingRegionException(msg); 3108 } 3109 return rsh; 3110 } 3111 3112 /** 3113 * @return Pair with scannerName key to use with this new Scanner and its RegionScannerHolder 3114 * value. 3115 */ 3116 private Pair<String, RegionScannerHolder> newRegionScanner(ScanRequest request, 3117 ScanResponse.Builder builder) throws IOException { 3118 HRegion region = getRegion(request.getRegion()); 3119 rejectIfInStandByState(region); 3120 ClientProtos.Scan protoScan = request.getScan(); 3121 boolean isLoadingCfsOnDemandSet = protoScan.hasLoadColumnFamiliesOnDemand(); 3122 Scan scan = ProtobufUtil.toScan(protoScan); 3123 // if the request doesn't set this, get the default region setting. 3124 if (!isLoadingCfsOnDemandSet) { 3125 scan.setLoadColumnFamiliesOnDemand(region.isLoadingCfsOnDemandDefault()); 3126 } 3127 3128 if (!scan.hasFamilies()) { 3129 // Adding all families to scanner 3130 for (byte[] family : region.getTableDescriptor().getColumnFamilyNames()) { 3131 scan.addFamily(family); 3132 } 3133 } 3134 if (region.getCoprocessorHost() != null) { 3135 // preScannerOpen is not allowed to return a RegionScanner. Only post hook can create a 3136 // wrapper for the core created RegionScanner 3137 region.getCoprocessorHost().preScannerOpen(scan); 3138 } 3139 RegionScannerImpl coreScanner = region.getScanner(scan); 3140 Shipper shipper = coreScanner; 3141 RegionScanner scanner = coreScanner; 3142 try { 3143 if (region.getCoprocessorHost() != null) { 3144 scanner = region.getCoprocessorHost().postScannerOpen(scan, scanner); 3145 } 3146 } catch (Exception e) { 3147 // Although region coprocessor is for advanced users and they should take care of the 3148 // implementation to not damage the HBase system, closing the scanner on exception here does 3149 // not have any bad side effect, so let's do it 3150 scanner.close(); 3151 throw e; 3152 } 3153 long scannerId = scannerIdGenerator.generateNewScannerId(); 3154 builder.setScannerId(scannerId); 3155 builder.setMvccReadPoint(scanner.getMvccReadPoint()); 3156 builder.setTtl(scannerLeaseTimeoutPeriod); 3157 String scannerName = toScannerName(scannerId); 3158 3159 boolean fullRegionScan = 3160 !region.getRegionInfo().getTable().isSystemTable() && isFullRegionScan(scan, region); 3161 3162 return new Pair<String, RegionScannerHolder>(scannerName, 3163 addScanner(scannerName, scanner, shipper, region, scan.isNeedCursorResult(), fullRegionScan)); 3164 } 3165 3166 /** 3167 * The returned String is used as key doing look up of outstanding Scanners in this Servers' 3168 * this.scanners, the Map of outstanding scanners and their current state. 3169 * @param scannerId A scanner long id. 3170 * @return The long id as a String. 3171 */ 3172 private static String toScannerName(long scannerId) { 3173 return Long.toString(scannerId); 3174 } 3175 3176 private void checkScanNextCallSeq(ScanRequest request, RegionScannerHolder rsh) 3177 throws OutOfOrderScannerNextException { 3178 // if nextCallSeq does not match throw Exception straight away. This needs to be 3179 // performed even before checking of Lease. 3180 // See HBASE-5974 3181 if (request.hasNextCallSeq()) { 3182 long callSeq = request.getNextCallSeq(); 3183 if (!rsh.incNextCallSeq(callSeq)) { 3184 throw new OutOfOrderScannerNextException( 3185 "Expected nextCallSeq: " + rsh.getNextCallSeq() + " But the nextCallSeq got from client: " 3186 + request.getNextCallSeq() + "; request=" + TextFormat.shortDebugString(request)); 3187 } 3188 } 3189 } 3190 3191 private void addScannerLeaseBack(LeaseManager.Lease lease) { 3192 try { 3193 server.getLeaseManager().addLease(lease); 3194 } catch (LeaseStillHeldException e) { 3195 // should not happen as the scanner id is unique. 3196 throw new AssertionError(e); 3197 } 3198 } 3199 3200 // visible for testing only 3201 long getTimeLimit(RpcCall rpcCall, HBaseRpcController controller, 3202 boolean allowHeartbeatMessages) { 3203 // Set the time limit to be half of the more restrictive timeout value (one of the 3204 // timeout values must be positive). In the event that both values are positive, the 3205 // more restrictive of the two is used to calculate the limit. 3206 if (allowHeartbeatMessages) { 3207 long now = EnvironmentEdgeManager.currentTime(); 3208 long remainingTimeout = getRemainingRpcTimeout(rpcCall, controller, now); 3209 if (scannerLeaseTimeoutPeriod > 0 || remainingTimeout > 0) { 3210 long timeLimitDelta; 3211 if (scannerLeaseTimeoutPeriod > 0 && remainingTimeout > 0) { 3212 timeLimitDelta = Math.min(scannerLeaseTimeoutPeriod, remainingTimeout); 3213 } else { 3214 timeLimitDelta = 3215 scannerLeaseTimeoutPeriod > 0 ? scannerLeaseTimeoutPeriod : remainingTimeout; 3216 } 3217 3218 // Use half of whichever timeout value was more restrictive... But don't allow 3219 // the time limit to be less than the allowable minimum (could cause an 3220 // immediate timeout before scanning any data). 3221 timeLimitDelta = Math.max(timeLimitDelta / 2, minimumScanTimeLimitDelta); 3222 return now + timeLimitDelta; 3223 } 3224 } 3225 // Default value of timeLimit is negative to indicate no timeLimit should be 3226 // enforced. 3227 return -1L; 3228 } 3229 3230 private long getRemainingRpcTimeout(RpcCall call, HBaseRpcController controller, long now) { 3231 long timeout; 3232 if (controller != null && controller.getCallTimeout() > 0) { 3233 timeout = controller.getCallTimeout(); 3234 } else if (rpcTimeout > 0) { 3235 timeout = rpcTimeout; 3236 } else { 3237 return -1; 3238 } 3239 if (call != null) { 3240 timeout -= (now - call.getReceiveTime()); 3241 } 3242 // getTimeLimit ignores values <= 0, but timeout may now be negative if queue time was high. 3243 // return minimum value here in that case so we count this in calculating the final delta. 3244 return Math.max(minimumScanTimeLimitDelta, timeout); 3245 } 3246 3247 private void checkLimitOfRows(int numOfCompleteRows, int limitOfRows, boolean moreRows, 3248 ScannerContext scannerContext, ScanResponse.Builder builder) { 3249 if (numOfCompleteRows >= limitOfRows) { 3250 if (LOG.isTraceEnabled()) { 3251 LOG.trace("Done scanning, limit of rows reached, moreRows: " + moreRows 3252 + " scannerContext: " + scannerContext); 3253 } 3254 builder.setMoreResults(false); 3255 } 3256 } 3257 3258 // return whether we have more results in region. 3259 private void scan(HBaseRpcController controller, ScanRequest request, RegionScannerHolder rsh, 3260 long maxQuotaResultSize, int maxResults, int limitOfRows, List<Result> results, 3261 ScanResponse.Builder builder, RpcCall rpcCall) throws IOException { 3262 HRegion region = rsh.r; 3263 RegionScanner scanner = rsh.s; 3264 long maxResultSize; 3265 if (scanner.getMaxResultSize() > 0) { 3266 maxResultSize = Math.min(scanner.getMaxResultSize(), maxQuotaResultSize); 3267 } else { 3268 maxResultSize = maxQuotaResultSize; 3269 } 3270 // This is cells inside a row. Default size is 10 so if many versions or many cfs, 3271 // then we'll resize. Resizings show in profiler. Set it higher than 10. For now 3272 // arbitrary 32. TODO: keep record of general size of results being returned. 3273 ArrayList<Cell> values = new ArrayList<>(32); 3274 region.startRegionOperation(Operation.SCAN); 3275 long before = EnvironmentEdgeManager.currentTime(); 3276 // Used to check if we've matched the row limit set on the Scan 3277 int numOfCompleteRows = 0; 3278 // Count of times we call nextRaw; can be > numOfCompleteRows. 3279 int numOfNextRawCalls = 0; 3280 try { 3281 int numOfResults = 0; 3282 synchronized (scanner) { 3283 boolean stale = (region.getRegionInfo().getReplicaId() != 0); 3284 boolean clientHandlesPartials = 3285 request.hasClientHandlesPartials() && request.getClientHandlesPartials(); 3286 boolean clientHandlesHeartbeats = 3287 request.hasClientHandlesHeartbeats() && request.getClientHandlesHeartbeats(); 3288 3289 // On the server side we must ensure that the correct ordering of partial results is 3290 // returned to the client to allow them to properly reconstruct the partial results. 3291 // If the coprocessor host is adding to the result list, we cannot guarantee the 3292 // correct ordering of partial results and so we prevent partial results from being 3293 // formed. 3294 boolean serverGuaranteesOrderOfPartials = results.isEmpty(); 3295 boolean allowPartialResults = clientHandlesPartials && serverGuaranteesOrderOfPartials; 3296 boolean moreRows = false; 3297 3298 // Heartbeat messages occur when the processing of the ScanRequest is exceeds a 3299 // certain time threshold on the server. When the time threshold is exceeded, the 3300 // server stops the scan and sends back whatever Results it has accumulated within 3301 // that time period (may be empty). Since heartbeat messages have the potential to 3302 // create partial Results (in the event that the timeout occurs in the middle of a 3303 // row), we must only generate heartbeat messages when the client can handle both 3304 // heartbeats AND partials 3305 boolean allowHeartbeatMessages = clientHandlesHeartbeats && allowPartialResults; 3306 3307 long timeLimit = getTimeLimit(rpcCall, controller, allowHeartbeatMessages); 3308 3309 final LimitScope sizeScope = 3310 allowPartialResults ? LimitScope.BETWEEN_CELLS : LimitScope.BETWEEN_ROWS; 3311 final LimitScope timeScope = 3312 allowHeartbeatMessages ? LimitScope.BETWEEN_CELLS : LimitScope.BETWEEN_ROWS; 3313 3314 boolean trackMetrics = request.hasTrackScanMetrics() && request.getTrackScanMetrics(); 3315 3316 // Configure with limits for this RPC. Set keep progress true since size progress 3317 // towards size limit should be kept between calls to nextRaw 3318 ScannerContext.Builder contextBuilder = ScannerContext.newBuilder(true); 3319 // maxResultSize - either we can reach this much size for all cells(being read) data or sum 3320 // of heap size occupied by cells(being read). Cell data means its key and value parts. 3321 // maxQuotaResultSize - max results just from server side configuration and quotas, without 3322 // user's specified max. We use this for evaluating limits based on blocks (not cells). 3323 // We may have accumulated some results in coprocessor preScannerNext call. Subtract any 3324 // cell or block size from maximum here so we adhere to total limits of request. 3325 // Note: we track block size in StoreScanner. If the CP hook got cells from hbase, it will 3326 // have accumulated block bytes. If not, this will be 0 for block size. 3327 long maxCellSize = maxResultSize; 3328 long maxBlockSize = maxQuotaResultSize; 3329 if (rpcCall != null) { 3330 maxBlockSize -= rpcCall.getBlockBytesScanned(); 3331 maxCellSize -= rpcCall.getResponseCellSize(); 3332 } 3333 3334 contextBuilder.setSizeLimit(sizeScope, maxCellSize, maxCellSize, maxBlockSize); 3335 contextBuilder.setBatchLimit(scanner.getBatch()); 3336 contextBuilder.setTimeLimit(timeScope, timeLimit); 3337 contextBuilder.setTrackMetrics(trackMetrics); 3338 ScannerContext scannerContext = contextBuilder.build(); 3339 boolean limitReached = false; 3340 while (numOfResults < maxResults) { 3341 // Reset the batch progress to 0 before every call to RegionScanner#nextRaw. The 3342 // batch limit is a limit on the number of cells per Result. Thus, if progress is 3343 // being tracked (i.e. scannerContext.keepProgress() is true) then we need to 3344 // reset the batch progress between nextRaw invocations since we don't want the 3345 // batch progress from previous calls to affect future calls 3346 scannerContext.setBatchProgress(0); 3347 assert values.isEmpty(); 3348 3349 // Collect values to be returned here 3350 moreRows = scanner.nextRaw(values, scannerContext); 3351 if (rpcCall == null) { 3352 // When there is no RpcCallContext,copy EC to heap, then the scanner would close, 3353 // This can be an EXPENSIVE call. It may make an extra copy from offheap to onheap 3354 // buffers.See more details in HBASE-26036. 3355 CellUtil.cloneIfNecessary(values); 3356 } 3357 numOfNextRawCalls++; 3358 3359 if (!values.isEmpty()) { 3360 if (limitOfRows > 0) { 3361 // First we need to check if the last result is partial and we have a row change. If 3362 // so then we need to increase the numOfCompleteRows. 3363 if (results.isEmpty()) { 3364 if ( 3365 rsh.rowOfLastPartialResult != null 3366 && !CellUtil.matchingRows(values.get(0), rsh.rowOfLastPartialResult) 3367 ) { 3368 numOfCompleteRows++; 3369 checkLimitOfRows(numOfCompleteRows, limitOfRows, moreRows, scannerContext, 3370 builder); 3371 } 3372 } else { 3373 Result lastResult = results.get(results.size() - 1); 3374 if ( 3375 lastResult.mayHaveMoreCellsInRow() 3376 && !CellUtil.matchingRows(values.get(0), lastResult.getRow()) 3377 ) { 3378 numOfCompleteRows++; 3379 checkLimitOfRows(numOfCompleteRows, limitOfRows, moreRows, scannerContext, 3380 builder); 3381 } 3382 } 3383 if (builder.hasMoreResults() && !builder.getMoreResults()) { 3384 break; 3385 } 3386 } 3387 boolean mayHaveMoreCellsInRow = scannerContext.mayHaveMoreCellsInRow(); 3388 Result r = Result.create(values, null, stale, mayHaveMoreCellsInRow); 3389 results.add(r); 3390 numOfResults++; 3391 if (!mayHaveMoreCellsInRow && limitOfRows > 0) { 3392 numOfCompleteRows++; 3393 checkLimitOfRows(numOfCompleteRows, limitOfRows, moreRows, scannerContext, builder); 3394 if (builder.hasMoreResults() && !builder.getMoreResults()) { 3395 break; 3396 } 3397 } 3398 } else if (!moreRows && !results.isEmpty()) { 3399 // No more cells for the scan here, we need to ensure that the mayHaveMoreCellsInRow of 3400 // last result is false. Otherwise it's possible that: the first nextRaw returned 3401 // because BATCH_LIMIT_REACHED (BTW it happen to exhaust all cells of the scan),so the 3402 // last result's mayHaveMoreCellsInRow will be true. while the following nextRaw will 3403 // return with moreRows=false, which means moreResultsInRegion would be false, it will 3404 // be a contradictory state (HBASE-21206). 3405 int lastIdx = results.size() - 1; 3406 Result r = results.get(lastIdx); 3407 if (r.mayHaveMoreCellsInRow()) { 3408 results.set(lastIdx, Result.create(r.rawCells(), r.getExists(), r.isStale(), false)); 3409 } 3410 } 3411 boolean sizeLimitReached = scannerContext.checkSizeLimit(LimitScope.BETWEEN_ROWS); 3412 boolean timeLimitReached = scannerContext.checkTimeLimit(LimitScope.BETWEEN_ROWS); 3413 boolean resultsLimitReached = numOfResults >= maxResults; 3414 limitReached = sizeLimitReached || timeLimitReached || resultsLimitReached; 3415 3416 if (limitReached || !moreRows) { 3417 // With block size limit, we may exceed size limit without collecting any results. 3418 // In this case we want to send heartbeat and/or cursor. We don't want to send heartbeat 3419 // or cursor if results were collected, for example for cell size or heap size limits. 3420 boolean sizeLimitReachedWithoutResults = sizeLimitReached && results.isEmpty(); 3421 // We only want to mark a ScanResponse as a heartbeat message in the event that 3422 // there are more values to be read server side. If there aren't more values, 3423 // marking it as a heartbeat is wasteful because the client will need to issue 3424 // another ScanRequest only to realize that they already have all the values 3425 if (moreRows && (timeLimitReached || sizeLimitReachedWithoutResults)) { 3426 // Heartbeat messages occur when the time limit has been reached, or size limit has 3427 // been reached before collecting any results. This can happen for heavily filtered 3428 // scans which scan over too many blocks. 3429 builder.setHeartbeatMessage(true); 3430 if (rsh.needCursor) { 3431 Cell cursorCell = scannerContext.getLastPeekedCell(); 3432 if (cursorCell != null) { 3433 builder.setCursor(ProtobufUtil.toCursor(cursorCell)); 3434 } 3435 } 3436 } 3437 break; 3438 } 3439 values.clear(); 3440 } 3441 if (rpcCall != null) { 3442 rpcCall.incrementResponseCellSize(scannerContext.getHeapSizeProgress()); 3443 } 3444 builder.setMoreResultsInRegion(moreRows); 3445 // Check to see if the client requested that we track metrics server side. If the 3446 // client requested metrics, retrieve the metrics from the scanner context. 3447 if (trackMetrics) { 3448 // rather than increment yet another counter in StoreScanner, just set the value here 3449 // from block size progress before writing into the response 3450 scannerContext.getMetrics().countOfBlockBytesScanned 3451 .set(scannerContext.getBlockSizeProgress()); 3452 Map<String, Long> metrics = scannerContext.getMetrics().getMetricsMap(); 3453 ScanMetrics.Builder metricBuilder = ScanMetrics.newBuilder(); 3454 NameInt64Pair.Builder pairBuilder = NameInt64Pair.newBuilder(); 3455 3456 for (Entry<String, Long> entry : metrics.entrySet()) { 3457 pairBuilder.setName(entry.getKey()); 3458 pairBuilder.setValue(entry.getValue()); 3459 metricBuilder.addMetrics(pairBuilder.build()); 3460 } 3461 3462 builder.setScanMetrics(metricBuilder.build()); 3463 } 3464 } 3465 } finally { 3466 region.closeRegionOperation(); 3467 // Update serverside metrics, even on error. 3468 long end = EnvironmentEdgeManager.currentTime(); 3469 long responseCellSize = 0; 3470 long blockBytesScanned = 0; 3471 if (rpcCall != null) { 3472 responseCellSize = rpcCall.getResponseCellSize(); 3473 blockBytesScanned = rpcCall.getBlockBytesScanned(); 3474 } 3475 region.getMetrics().updateScanTime(end - before); 3476 final MetricsRegionServer metricsRegionServer = server.getMetrics(); 3477 if (metricsRegionServer != null) { 3478 metricsRegionServer.updateScan(region, end - before, responseCellSize, blockBytesScanned); 3479 metricsRegionServer.updateReadQueryMeter(region, numOfNextRawCalls); 3480 } 3481 } 3482 // coprocessor postNext hook 3483 if (region.getCoprocessorHost() != null) { 3484 region.getCoprocessorHost().postScannerNext(scanner, results, maxResults, true); 3485 } 3486 } 3487 3488 /** 3489 * Scan data in a table. 3490 * @param controller the RPC controller 3491 * @param request the scan request 3492 */ 3493 @Override 3494 public ScanResponse scan(final RpcController controller, final ScanRequest request) 3495 throws ServiceException { 3496 if (controller != null && !(controller instanceof HBaseRpcController)) { 3497 throw new UnsupportedOperationException( 3498 "We only do " + "HBaseRpcControllers! FIX IF A PROBLEM: " + controller); 3499 } 3500 if (!request.hasScannerId() && !request.hasScan()) { 3501 throw new ServiceException( 3502 new DoNotRetryIOException("Missing required input: scannerId or scan")); 3503 } 3504 try { 3505 checkOpen(); 3506 } catch (IOException e) { 3507 if (request.hasScannerId()) { 3508 String scannerName = toScannerName(request.getScannerId()); 3509 if (LOG.isDebugEnabled()) { 3510 LOG.debug( 3511 "Server shutting down and client tried to access missing scanner " + scannerName); 3512 } 3513 final LeaseManager leaseManager = server.getLeaseManager(); 3514 if (leaseManager != null) { 3515 try { 3516 leaseManager.cancelLease(scannerName); 3517 } catch (LeaseException le) { 3518 // No problem, ignore 3519 if (LOG.isTraceEnabled()) { 3520 LOG.trace("Un-able to cancel lease of scanner. It could already be closed."); 3521 } 3522 } 3523 } 3524 } 3525 throw new ServiceException(e); 3526 } 3527 requestCount.increment(); 3528 rpcScanRequestCount.increment(); 3529 RegionScannerHolder rsh; 3530 ScanResponse.Builder builder = ScanResponse.newBuilder(); 3531 String scannerName; 3532 try { 3533 if (request.hasScannerId()) { 3534 // The downstream projects such as AsyncHBase in OpenTSDB need this value. See HBASE-18000 3535 // for more details. 3536 long scannerId = request.getScannerId(); 3537 builder.setScannerId(scannerId); 3538 scannerName = toScannerName(scannerId); 3539 rsh = getRegionScanner(request); 3540 } else { 3541 Pair<String, RegionScannerHolder> scannerNameAndRSH = newRegionScanner(request, builder); 3542 scannerName = scannerNameAndRSH.getFirst(); 3543 rsh = scannerNameAndRSH.getSecond(); 3544 } 3545 } catch (IOException e) { 3546 if (e == SCANNER_ALREADY_CLOSED) { 3547 // Now we will close scanner automatically if there are no more results for this region but 3548 // the old client will still send a close request to us. Just ignore it and return. 3549 return builder.build(); 3550 } 3551 throw new ServiceException(e); 3552 } 3553 if (rsh.fullRegionScan) { 3554 rpcFullScanRequestCount.increment(); 3555 } 3556 HRegion region = rsh.r; 3557 LeaseManager.Lease lease; 3558 try { 3559 // Remove lease while its being processed in server; protects against case 3560 // where processing of request takes > lease expiration time. or null if none found. 3561 lease = server.getLeaseManager().removeLease(scannerName); 3562 } catch (LeaseException e) { 3563 throw new ServiceException(e); 3564 } 3565 if (request.hasRenew() && request.getRenew()) { 3566 // add back and return 3567 addScannerLeaseBack(lease); 3568 try { 3569 checkScanNextCallSeq(request, rsh); 3570 } catch (OutOfOrderScannerNextException e) { 3571 throw new ServiceException(e); 3572 } 3573 return builder.build(); 3574 } 3575 OperationQuota quota; 3576 try { 3577 quota = getRpcQuotaManager().checkQuota(region, OperationQuota.OperationType.SCAN); 3578 } catch (IOException e) { 3579 addScannerLeaseBack(lease); 3580 throw new ServiceException(e); 3581 } 3582 try { 3583 checkScanNextCallSeq(request, rsh); 3584 } catch (OutOfOrderScannerNextException e) { 3585 addScannerLeaseBack(lease); 3586 throw new ServiceException(e); 3587 } 3588 // Now we have increased the next call sequence. If we give client an error, the retry will 3589 // never success. So we'd better close the scanner and return a DoNotRetryIOException to client 3590 // and then client will try to open a new scanner. 3591 boolean closeScanner = request.hasCloseScanner() ? request.getCloseScanner() : false; 3592 int rows; // this is scan.getCaching 3593 if (request.hasNumberOfRows()) { 3594 rows = request.getNumberOfRows(); 3595 } else { 3596 rows = closeScanner ? 0 : 1; 3597 } 3598 RpcCall rpcCall = RpcServer.getCurrentCall().orElse(null); 3599 // now let's do the real scan. 3600 long maxQuotaResultSize = Math.min(maxScannerResultSize, quota.getReadAvailable()); 3601 RegionScanner scanner = rsh.s; 3602 // this is the limit of rows for this scan, if we the number of rows reach this value, we will 3603 // close the scanner. 3604 int limitOfRows; 3605 if (request.hasLimitOfRows()) { 3606 limitOfRows = request.getLimitOfRows(); 3607 } else { 3608 limitOfRows = -1; 3609 } 3610 boolean scannerClosed = false; 3611 try { 3612 List<Result> results = new ArrayList<>(Math.min(rows, 512)); 3613 if (rows > 0) { 3614 boolean done = false; 3615 // Call coprocessor. Get region info from scanner. 3616 if (region.getCoprocessorHost() != null) { 3617 Boolean bypass = region.getCoprocessorHost().preScannerNext(scanner, results, rows); 3618 if (!results.isEmpty()) { 3619 for (Result r : results) { 3620 // add cell size from CP results so we can track response size and update limits 3621 // when calling scan below if !done. We'll also have tracked block size if the CP 3622 // got results from hbase, since StoreScanner tracks that for all calls automatically. 3623 addSize(rpcCall, r); 3624 } 3625 } 3626 if (bypass != null && bypass.booleanValue()) { 3627 done = true; 3628 } 3629 } 3630 if (!done) { 3631 scan((HBaseRpcController) controller, request, rsh, maxQuotaResultSize, rows, limitOfRows, 3632 results, builder, rpcCall); 3633 } else { 3634 builder.setMoreResultsInRegion(!results.isEmpty()); 3635 } 3636 } else { 3637 // This is a open scanner call with numberOfRow = 0, so set more results in region to true. 3638 builder.setMoreResultsInRegion(true); 3639 } 3640 3641 quota.addScanResult(results); 3642 addResults(builder, results, (HBaseRpcController) controller, 3643 RegionReplicaUtil.isDefaultReplica(region.getRegionInfo()), 3644 isClientCellBlockSupport(rpcCall)); 3645 if (scanner.isFilterDone() && results.isEmpty()) { 3646 // If the scanner's filter - if any - is done with the scan 3647 // only set moreResults to false if the results is empty. This is used to keep compatible 3648 // with the old scan implementation where we just ignore the returned results if moreResults 3649 // is false. Can remove the isEmpty check after we get rid of the old implementation. 3650 builder.setMoreResults(false); 3651 } 3652 // Later we may close the scanner depending on this flag so here we need to make sure that we 3653 // have already set this flag. 3654 assert builder.hasMoreResultsInRegion(); 3655 // we only set moreResults to false in the above code, so set it to true if we haven't set it 3656 // yet. 3657 if (!builder.hasMoreResults()) { 3658 builder.setMoreResults(true); 3659 } 3660 if (builder.getMoreResults() && builder.getMoreResultsInRegion() && !results.isEmpty()) { 3661 // Record the last cell of the last result if it is a partial result 3662 // We need this to calculate the complete rows we have returned to client as the 3663 // mayHaveMoreCellsInRow is true does not mean that there will be extra cells for the 3664 // current row. We may filter out all the remaining cells for the current row and just 3665 // return the cells of the nextRow when calling RegionScanner.nextRaw. So here we need to 3666 // check for row change. 3667 Result lastResult = results.get(results.size() - 1); 3668 if (lastResult.mayHaveMoreCellsInRow()) { 3669 rsh.rowOfLastPartialResult = lastResult.getRow(); 3670 } else { 3671 rsh.rowOfLastPartialResult = null; 3672 } 3673 } 3674 if (!builder.getMoreResults() || !builder.getMoreResultsInRegion() || closeScanner) { 3675 scannerClosed = true; 3676 closeScanner(region, scanner, scannerName, rpcCall); 3677 } 3678 3679 // There's no point returning to a timed out client. Throwing ensures scanner is closed 3680 if (rpcCall != null && EnvironmentEdgeManager.currentTime() > rpcCall.getDeadline()) { 3681 throw new TimeoutIOException("Client deadline exceeded, cannot return results"); 3682 } 3683 3684 return builder.build(); 3685 } catch (IOException e) { 3686 try { 3687 // scanner is closed here 3688 scannerClosed = true; 3689 // The scanner state might be left in a dirty state, so we will tell the Client to 3690 // fail this RPC and close the scanner while opening up another one from the start of 3691 // row that the client has last seen. 3692 closeScanner(region, scanner, scannerName, rpcCall); 3693 3694 // If it is a DoNotRetryIOException already, throw as it is. Unfortunately, DNRIOE is 3695 // used in two different semantics. 3696 // (1) The first is to close the client scanner and bubble up the exception all the way 3697 // to the application. This is preferred when the exception is really un-recoverable 3698 // (like CorruptHFileException, etc). Plain DoNotRetryIOException also falls into this 3699 // bucket usually. 3700 // (2) Second semantics is to close the current region scanner only, but continue the 3701 // client scanner by overriding the exception. This is usually UnknownScannerException, 3702 // OutOfOrderScannerNextException, etc where the region scanner has to be closed, but the 3703 // application-level ClientScanner has to continue without bubbling up the exception to 3704 // the client. See ClientScanner code to see how it deals with these special exceptions. 3705 if (e instanceof DoNotRetryIOException) { 3706 throw e; 3707 } 3708 3709 // If it is a FileNotFoundException, wrap as a 3710 // DoNotRetryIOException. This can avoid the retry in ClientScanner. 3711 if (e instanceof FileNotFoundException) { 3712 throw new DoNotRetryIOException(e); 3713 } 3714 3715 // We closed the scanner already. Instead of throwing the IOException, and client 3716 // retrying with the same scannerId only to get USE on the next RPC, we directly throw 3717 // a special exception to save an RPC. 3718 if (VersionInfoUtil.hasMinimumVersion(rpcCall.getClientVersionInfo(), 1, 4)) { 3719 // 1.4.0+ clients know how to handle 3720 throw new ScannerResetException("Scanner is closed on the server-side", e); 3721 } else { 3722 // older clients do not know about SRE. Just throw USE, which they will handle 3723 throw new UnknownScannerException("Throwing UnknownScannerException to reset the client" 3724 + " scanner state for clients older than 1.3.", e); 3725 } 3726 } catch (IOException ioe) { 3727 throw new ServiceException(ioe); 3728 } 3729 } finally { 3730 if (!scannerClosed) { 3731 // Adding resets expiration time on lease. 3732 // the closeCallBack will be set in closeScanner so here we only care about shippedCallback 3733 if (rpcCall != null) { 3734 rpcCall.setCallBack(rsh.shippedCallback); 3735 } else { 3736 // If context is null,here we call rsh.shippedCallback directly to reuse the logic in 3737 // rsh.shippedCallback to release the internal resources in rsh,and lease is also added 3738 // back to regionserver's LeaseManager in rsh.shippedCallback. 3739 runShippedCallback(rsh); 3740 } 3741 } 3742 quota.close(); 3743 } 3744 } 3745 3746 private void runShippedCallback(RegionScannerHolder rsh) throws ServiceException { 3747 assert rsh.shippedCallback != null; 3748 try { 3749 rsh.shippedCallback.run(); 3750 } catch (IOException ioe) { 3751 throw new ServiceException(ioe); 3752 } 3753 } 3754 3755 private void closeScanner(HRegion region, RegionScanner scanner, String scannerName, 3756 RpcCallContext context) throws IOException { 3757 if (region.getCoprocessorHost() != null) { 3758 if (region.getCoprocessorHost().preScannerClose(scanner)) { 3759 // bypass the actual close. 3760 return; 3761 } 3762 } 3763 RegionScannerHolder rsh = scanners.remove(scannerName); 3764 if (rsh != null) { 3765 if (context != null) { 3766 context.setCallBack(rsh.closeCallBack); 3767 } else { 3768 rsh.s.close(); 3769 } 3770 if (region.getCoprocessorHost() != null) { 3771 region.getCoprocessorHost().postScannerClose(scanner); 3772 } 3773 closedScanners.put(scannerName, scannerName); 3774 } 3775 } 3776 3777 @Override 3778 public CoprocessorServiceResponse execRegionServerService(RpcController controller, 3779 CoprocessorServiceRequest request) throws ServiceException { 3780 rpcPreCheck("execRegionServerService"); 3781 return server.execRegionServerService(controller, request); 3782 } 3783 3784 @Override 3785 public GetSpaceQuotaSnapshotsResponse getSpaceQuotaSnapshots(RpcController controller, 3786 GetSpaceQuotaSnapshotsRequest request) throws ServiceException { 3787 try { 3788 final RegionServerSpaceQuotaManager manager = server.getRegionServerSpaceQuotaManager(); 3789 final GetSpaceQuotaSnapshotsResponse.Builder builder = 3790 GetSpaceQuotaSnapshotsResponse.newBuilder(); 3791 if (manager != null) { 3792 final Map<TableName, SpaceQuotaSnapshot> snapshots = manager.copyQuotaSnapshots(); 3793 for (Entry<TableName, SpaceQuotaSnapshot> snapshot : snapshots.entrySet()) { 3794 builder.addSnapshots(TableQuotaSnapshot.newBuilder() 3795 .setTableName(ProtobufUtil.toProtoTableName(snapshot.getKey())) 3796 .setSnapshot(SpaceQuotaSnapshot.toProtoSnapshot(snapshot.getValue())).build()); 3797 } 3798 } 3799 return builder.build(); 3800 } catch (Exception e) { 3801 throw new ServiceException(e); 3802 } 3803 } 3804 3805 @Override 3806 public ClearRegionBlockCacheResponse clearRegionBlockCache(RpcController controller, 3807 ClearRegionBlockCacheRequest request) throws ServiceException { 3808 rpcPreCheck("clearRegionBlockCache"); 3809 ClearRegionBlockCacheResponse.Builder builder = ClearRegionBlockCacheResponse.newBuilder(); 3810 CacheEvictionStatsBuilder stats = CacheEvictionStats.builder(); 3811 List<HRegion> regions = getRegions(request.getRegionList(), stats); 3812 for (HRegion region : regions) { 3813 try { 3814 stats = stats.append(this.server.clearRegionBlockCache(region)); 3815 } catch (Exception e) { 3816 stats.addException(region.getRegionInfo().getRegionName(), e); 3817 } 3818 } 3819 stats.withMaxCacheSize(server.getBlockCache().map(BlockCache::getMaxSize).orElse(0L)); 3820 return builder.setStats(ProtobufUtil.toCacheEvictionStats(stats.build())).build(); 3821 } 3822 3823 private void executeOpenRegionProcedures(OpenRegionRequest request, 3824 Map<TableName, TableDescriptor> tdCache) { 3825 long masterSystemTime = request.hasMasterSystemTime() ? request.getMasterSystemTime() : -1; 3826 for (RegionOpenInfo regionOpenInfo : request.getOpenInfoList()) { 3827 RegionInfo regionInfo = ProtobufUtil.toRegionInfo(regionOpenInfo.getRegion()); 3828 TableName tableName = regionInfo.getTable(); 3829 TableDescriptor tableDesc = tdCache.get(tableName); 3830 if (tableDesc == null) { 3831 try { 3832 tableDesc = server.getTableDescriptors().get(regionInfo.getTable()); 3833 } catch (IOException e) { 3834 // Here we do not fail the whole method since we also need deal with other 3835 // procedures, and we can not ignore this one, so we still schedule a 3836 // AssignRegionHandler and it will report back to master if we still can not get the 3837 // TableDescriptor. 3838 LOG.warn("Failed to get TableDescriptor of {}, will try again in the handler", 3839 regionInfo.getTable(), e); 3840 } 3841 if (tableDesc != null) { 3842 tdCache.put(tableName, tableDesc); 3843 } 3844 } 3845 if (regionOpenInfo.getFavoredNodesCount() > 0) { 3846 server.updateRegionFavoredNodesMapping(regionInfo.getEncodedName(), 3847 regionOpenInfo.getFavoredNodesList()); 3848 } 3849 long procId = regionOpenInfo.getOpenProcId(); 3850 if (server.submitRegionProcedure(procId)) { 3851 server.getExecutorService().submit( 3852 AssignRegionHandler.create(server, regionInfo, procId, tableDesc, masterSystemTime)); 3853 } 3854 } 3855 } 3856 3857 private void executeCloseRegionProcedures(CloseRegionRequest request) { 3858 String encodedName; 3859 try { 3860 encodedName = ProtobufUtil.getRegionEncodedName(request.getRegion()); 3861 } catch (DoNotRetryIOException e) { 3862 throw new UncheckedIOException("Should not happen", e); 3863 } 3864 ServerName destination = request.hasDestinationServer() 3865 ? ProtobufUtil.toServerName(request.getDestinationServer()) 3866 : null; 3867 long procId = request.getCloseProcId(); 3868 boolean evictCache = request.getEvictCache(); 3869 if (server.submitRegionProcedure(procId)) { 3870 server.getExecutorService().submit( 3871 UnassignRegionHandler.create(server, encodedName, procId, false, destination, evictCache)); 3872 } 3873 } 3874 3875 private void executeProcedures(RemoteProcedureRequest request) { 3876 RSProcedureCallable callable; 3877 try { 3878 callable = Class.forName(request.getProcClass()).asSubclass(RSProcedureCallable.class) 3879 .getDeclaredConstructor().newInstance(); 3880 } catch (Exception e) { 3881 LOG.warn("Failed to instantiating remote procedure {}, pid={}", request.getProcClass(), 3882 request.getProcId(), e); 3883 server.remoteProcedureComplete(request.getProcId(), e); 3884 return; 3885 } 3886 callable.init(request.getProcData().toByteArray(), server); 3887 LOG.debug("Executing remote procedure {}, pid={}", callable.getClass(), request.getProcId()); 3888 server.executeProcedure(request.getProcId(), callable); 3889 } 3890 3891 @Override 3892 @QosPriority(priority = HConstants.ADMIN_QOS) 3893 public ExecuteProceduresResponse executeProcedures(RpcController controller, 3894 ExecuteProceduresRequest request) throws ServiceException { 3895 try { 3896 checkOpen(); 3897 throwOnWrongStartCode(request); 3898 server.getRegionServerCoprocessorHost().preExecuteProcedures(); 3899 if (request.getOpenRegionCount() > 0) { 3900 // Avoid reading from the TableDescritor every time(usually it will read from the file 3901 // system) 3902 Map<TableName, TableDescriptor> tdCache = new HashMap<>(); 3903 request.getOpenRegionList().forEach(req -> executeOpenRegionProcedures(req, tdCache)); 3904 } 3905 if (request.getCloseRegionCount() > 0) { 3906 request.getCloseRegionList().forEach(this::executeCloseRegionProcedures); 3907 } 3908 if (request.getProcCount() > 0) { 3909 request.getProcList().forEach(this::executeProcedures); 3910 } 3911 server.getRegionServerCoprocessorHost().postExecuteProcedures(); 3912 return ExecuteProceduresResponse.getDefaultInstance(); 3913 } catch (IOException e) { 3914 throw new ServiceException(e); 3915 } 3916 } 3917 3918 @Override 3919 public GetAllBootstrapNodesResponse getAllBootstrapNodes(RpcController controller, 3920 GetAllBootstrapNodesRequest request) throws ServiceException { 3921 GetAllBootstrapNodesResponse.Builder builder = GetAllBootstrapNodesResponse.newBuilder(); 3922 server.getBootstrapNodes() 3923 .forEachRemaining(server -> builder.addNode(ProtobufUtil.toServerName(server))); 3924 return builder.build(); 3925 } 3926}