001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019 020import java.io.FileNotFoundException; 021import java.io.IOException; 022import java.io.InterruptedIOException; 023import java.io.UncheckedIOException; 024import java.lang.reflect.InvocationTargetException; 025import java.net.BindException; 026import java.net.InetSocketAddress; 027import java.net.UnknownHostException; 028import java.nio.ByteBuffer; 029import java.util.ArrayList; 030import java.util.Arrays; 031import java.util.Collections; 032import java.util.HashMap; 033import java.util.Iterator; 034import java.util.List; 035import java.util.Map; 036import java.util.Map.Entry; 037import java.util.NavigableMap; 038import java.util.Set; 039import java.util.TreeSet; 040import java.util.concurrent.ConcurrentHashMap; 041import java.util.concurrent.ConcurrentMap; 042import java.util.concurrent.TimeUnit; 043import java.util.concurrent.atomic.AtomicBoolean; 044import java.util.concurrent.atomic.AtomicLong; 045import java.util.concurrent.atomic.LongAdder; 046import org.apache.commons.lang3.mutable.MutableObject; 047import org.apache.hadoop.conf.Configuration; 048import org.apache.hadoop.fs.Path; 049import org.apache.hadoop.hbase.ByteBufferExtendedCell; 050import org.apache.hadoop.hbase.CacheEvictionStats; 051import org.apache.hadoop.hbase.CacheEvictionStatsBuilder; 052import org.apache.hadoop.hbase.Cell; 053import org.apache.hadoop.hbase.CellScannable; 054import org.apache.hadoop.hbase.CellScanner; 055import org.apache.hadoop.hbase.CellUtil; 056import org.apache.hadoop.hbase.CompareOperator; 057import org.apache.hadoop.hbase.DoNotRetryIOException; 058import org.apache.hadoop.hbase.DroppedSnapshotException; 059import org.apache.hadoop.hbase.HBaseIOException; 060import org.apache.hadoop.hbase.HConstants; 061import org.apache.hadoop.hbase.MultiActionResultTooLarge; 062import org.apache.hadoop.hbase.NotServingRegionException; 063import org.apache.hadoop.hbase.PrivateCellUtil; 064import org.apache.hadoop.hbase.RegionTooBusyException; 065import org.apache.hadoop.hbase.Server; 066import org.apache.hadoop.hbase.ServerName; 067import org.apache.hadoop.hbase.TableName; 068import org.apache.hadoop.hbase.UnknownScannerException; 069import org.apache.hadoop.hbase.client.Append; 070import org.apache.hadoop.hbase.client.ConnectionUtils; 071import org.apache.hadoop.hbase.client.Delete; 072import org.apache.hadoop.hbase.client.Durability; 073import org.apache.hadoop.hbase.client.Get; 074import org.apache.hadoop.hbase.client.Increment; 075import org.apache.hadoop.hbase.client.Mutation; 076import org.apache.hadoop.hbase.client.Put; 077import org.apache.hadoop.hbase.client.RegionInfo; 078import org.apache.hadoop.hbase.client.RegionReplicaUtil; 079import org.apache.hadoop.hbase.client.Result; 080import org.apache.hadoop.hbase.client.Row; 081import org.apache.hadoop.hbase.client.RowMutations; 082import org.apache.hadoop.hbase.client.Scan; 083import org.apache.hadoop.hbase.client.TableDescriptor; 084import org.apache.hadoop.hbase.client.VersionInfoUtil; 085import org.apache.hadoop.hbase.conf.ConfigurationObserver; 086import org.apache.hadoop.hbase.exceptions.FailedSanityCheckException; 087import org.apache.hadoop.hbase.exceptions.OutOfOrderScannerNextException; 088import org.apache.hadoop.hbase.exceptions.ScannerResetException; 089import org.apache.hadoop.hbase.exceptions.UnknownProtocolException; 090import org.apache.hadoop.hbase.filter.ByteArrayComparable; 091import org.apache.hadoop.hbase.io.TimeRange; 092import org.apache.hadoop.hbase.io.hfile.BlockCache; 093import org.apache.hadoop.hbase.ipc.HBaseRPCErrorHandler; 094import org.apache.hadoop.hbase.ipc.HBaseRpcController; 095import org.apache.hadoop.hbase.ipc.PriorityFunction; 096import org.apache.hadoop.hbase.ipc.QosPriority; 097import org.apache.hadoop.hbase.ipc.RpcCallContext; 098import org.apache.hadoop.hbase.ipc.RpcCallback; 099import org.apache.hadoop.hbase.ipc.RpcScheduler; 100import org.apache.hadoop.hbase.ipc.RpcServer; 101import org.apache.hadoop.hbase.ipc.RpcServer.BlockingServiceAndInterface; 102import org.apache.hadoop.hbase.ipc.RpcServerFactory; 103import org.apache.hadoop.hbase.ipc.RpcServerInterface; 104import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException; 105import org.apache.hadoop.hbase.ipc.ServerRpcController; 106import org.apache.hadoop.hbase.log.HBaseMarkers; 107import org.apache.hadoop.hbase.master.MasterRpcServices; 108import org.apache.hadoop.hbase.net.Address; 109import org.apache.hadoop.hbase.procedure2.RSProcedureCallable; 110import org.apache.hadoop.hbase.quotas.ActivePolicyEnforcement; 111import org.apache.hadoop.hbase.quotas.OperationQuota; 112import org.apache.hadoop.hbase.quotas.QuotaUtil; 113import org.apache.hadoop.hbase.quotas.RegionServerRpcQuotaManager; 114import org.apache.hadoop.hbase.quotas.RegionServerSpaceQuotaManager; 115import org.apache.hadoop.hbase.quotas.SpaceQuotaSnapshot; 116import org.apache.hadoop.hbase.quotas.SpaceViolationPolicyEnforcement; 117import org.apache.hadoop.hbase.regionserver.HRegion.RegionScannerImpl; 118import org.apache.hadoop.hbase.regionserver.Leases.Lease; 119import org.apache.hadoop.hbase.regionserver.Leases.LeaseStillHeldException; 120import org.apache.hadoop.hbase.regionserver.Region.Operation; 121import org.apache.hadoop.hbase.regionserver.ScannerContext.LimitScope; 122import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker; 123import org.apache.hadoop.hbase.regionserver.handler.AssignRegionHandler; 124import org.apache.hadoop.hbase.regionserver.handler.OpenMetaHandler; 125import org.apache.hadoop.hbase.regionserver.handler.OpenPriorityRegionHandler; 126import org.apache.hadoop.hbase.regionserver.handler.OpenRegionHandler; 127import org.apache.hadoop.hbase.regionserver.handler.UnassignRegionHandler; 128import org.apache.hadoop.hbase.security.Superusers; 129import org.apache.hadoop.hbase.security.User; 130import org.apache.hadoop.hbase.security.access.AccessChecker; 131import org.apache.hadoop.hbase.security.access.Permission; 132import org.apache.hadoop.hbase.util.Bytes; 133import org.apache.hadoop.hbase.util.DNS; 134import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 135import org.apache.hadoop.hbase.util.Pair; 136import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil; 137import org.apache.hadoop.hbase.util.Strings; 138import org.apache.hadoop.hbase.wal.WAL; 139import org.apache.hadoop.hbase.wal.WALEdit; 140import org.apache.hadoop.hbase.wal.WALKey; 141import org.apache.hadoop.hbase.wal.WALSplitUtil; 142import org.apache.hadoop.hbase.wal.WALSplitUtil.MutationReplay; 143import org.apache.hadoop.hbase.zookeeper.ZKWatcher; 144import org.apache.yetus.audience.InterfaceAudience; 145import org.slf4j.Logger; 146import org.slf4j.LoggerFactory; 147 148import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; 149import org.apache.hbase.thirdparty.com.google.common.cache.Cache; 150import org.apache.hbase.thirdparty.com.google.common.cache.CacheBuilder; 151import org.apache.hbase.thirdparty.com.google.common.collect.Lists; 152import org.apache.hbase.thirdparty.com.google.protobuf.ByteString; 153import org.apache.hbase.thirdparty.com.google.protobuf.Message; 154import org.apache.hbase.thirdparty.com.google.protobuf.RpcController; 155import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException; 156import org.apache.hbase.thirdparty.com.google.protobuf.TextFormat; 157import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations; 158import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils; 159 160import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 161import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter; 162import org.apache.hadoop.hbase.shaded.protobuf.ResponseConverter; 163import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService; 164import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearCompactionQueuesRequest; 165import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearCompactionQueuesResponse; 166import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearRegionBlockCacheRequest; 167import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearRegionBlockCacheResponse; 168import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CloseRegionRequest; 169import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CloseRegionResponse; 170import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactRegionRequest; 171import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactRegionResponse; 172import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactionSwitchRequest; 173import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactionSwitchResponse; 174import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ExecuteProceduresRequest; 175import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ExecuteProceduresResponse; 176import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionRequest; 177import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionResponse; 178import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetOnlineRegionRequest; 179import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetOnlineRegionResponse; 180import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoRequest; 181import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoResponse; 182import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionLoadRequest; 183import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionLoadResponse; 184import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetServerInfoRequest; 185import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetServerInfoResponse; 186import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetStoreFileRequest; 187import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetStoreFileResponse; 188import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionRequest; 189import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionRequest.RegionOpenInfo; 190import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionResponse; 191import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionResponse.RegionOpeningState; 192import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.RemoteProcedureRequest; 193import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ReplicateWALEntryRequest; 194import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ReplicateWALEntryResponse; 195import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.RollWALWriterRequest; 196import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.RollWALWriterResponse; 197import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.StopServerRequest; 198import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.StopServerResponse; 199import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateConfigurationRequest; 200import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateConfigurationResponse; 201import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateFavoredNodesRequest; 202import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateFavoredNodesResponse; 203import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WALEntry; 204import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WarmupRegionRequest; 205import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WarmupRegionResponse; 206import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos; 207import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.Action; 208import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.BulkLoadHFileRequest; 209import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.BulkLoadHFileRequest.FamilyPath; 210import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.BulkLoadHFileResponse; 211import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CleanupBulkLoadRequest; 212import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CleanupBulkLoadResponse; 213import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService; 214import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.Condition; 215import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceRequest; 216import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceResponse; 217import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.GetRequest; 218import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.GetResponse; 219import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MultiRegionLoadStats; 220import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MultiRequest; 221import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MultiResponse; 222import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateRequest; 223import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateResponse; 224import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutationProto; 225import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutationProto.MutationType; 226import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.PrepareBulkLoadRequest; 227import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.PrepareBulkLoadResponse; 228import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.RegionAction; 229import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.RegionActionResult; 230import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ResultOrException; 231import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanRequest; 232import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanResponse; 233import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos; 234import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos.RegionLoad; 235import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.NameBytesPair; 236import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.NameInt64Pair; 237import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier; 238import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType; 239import org.apache.hadoop.hbase.shaded.protobuf.generated.MapReduceProtos.ScanMetrics; 240import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsRequest; 241import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsResponse; 242import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsResponse.TableQuotaSnapshot; 243import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader; 244import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.BulkLoadDescriptor; 245import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.CompactionDescriptor; 246import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.FlushDescriptor; 247import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.RegionEventDescriptor; 248 249/** 250 * Implements the regionserver RPC services. 251 */ 252@InterfaceAudience.Private 253@SuppressWarnings("deprecation") 254public class RSRpcServices implements HBaseRPCErrorHandler, 255 AdminService.BlockingInterface, ClientService.BlockingInterface, PriorityFunction, 256 ConfigurationObserver { 257 protected static final Logger LOG = LoggerFactory.getLogger(RSRpcServices.class); 258 259 /** RPC scheduler to use for the region server. */ 260 public static final String REGION_SERVER_RPC_SCHEDULER_FACTORY_CLASS = 261 "hbase.region.server.rpc.scheduler.factory.class"; 262 263 /** RPC scheduler to use for the master. */ 264 public static final String MASTER_RPC_SCHEDULER_FACTORY_CLASS = 265 "hbase.master.rpc.scheduler.factory.class"; 266 267 /** 268 * Minimum allowable time limit delta (in milliseconds) that can be enforced during scans. This 269 * configuration exists to prevent the scenario where a time limit is specified to be so 270 * restrictive that the time limit is reached immediately (before any cells are scanned). 271 */ 272 private static final String REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA = 273 "hbase.region.server.rpc.minimum.scan.time.limit.delta"; 274 /** 275 * Default value of {@link RSRpcServices#REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA} 276 */ 277 private static final long DEFAULT_REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA = 10; 278 279 /** 280 * Number of rows in a batch operation above which a warning will be logged. 281 */ 282 static final String BATCH_ROWS_THRESHOLD_NAME = "hbase.rpc.rows.warning.threshold"; 283 /** 284 * Default value of {@link RSRpcServices#BATCH_ROWS_THRESHOLD_NAME} 285 */ 286 static final int BATCH_ROWS_THRESHOLD_DEFAULT = 5000; 287 288 protected static final String RESERVOIR_ENABLED_KEY = "hbase.ipc.server.reservoir.enabled"; 289 290 // Request counter. (Includes requests that are not serviced by regions.) 291 // Count only once for requests with multiple actions like multi/caching-scan/replayBatch 292 final LongAdder requestCount = new LongAdder(); 293 294 // Request counter for rpc get 295 final LongAdder rpcGetRequestCount = new LongAdder(); 296 297 // Request counter for rpc scan 298 final LongAdder rpcScanRequestCount = new LongAdder(); 299 300 // Request counter for rpc multi 301 final LongAdder rpcMultiRequestCount = new LongAdder(); 302 303 // Request counter for rpc mutate 304 final LongAdder rpcMutateRequestCount = new LongAdder(); 305 306 // Server to handle client requests. 307 final RpcServerInterface rpcServer; 308 final InetSocketAddress isa; 309 310 @VisibleForTesting 311 protected final HRegionServer regionServer; 312 private final long maxScannerResultSize; 313 314 // The reference to the priority extraction function 315 private final PriorityFunction priority; 316 317 private ScannerIdGenerator scannerIdGenerator; 318 private final ConcurrentMap<String, RegionScannerHolder> scanners = new ConcurrentHashMap<>(); 319 // Hold the name of a closed scanner for a while. This is used to keep compatible for old clients 320 // which may send next or close request to a region scanner which has already been exhausted. The 321 // entries will be removed automatically after scannerLeaseTimeoutPeriod. 322 private final Cache<String, String> closedScanners; 323 /** 324 * The lease timeout period for client scanners (milliseconds). 325 */ 326 private final int scannerLeaseTimeoutPeriod; 327 328 /** 329 * The RPC timeout period (milliseconds) 330 */ 331 private final int rpcTimeout; 332 333 /** 334 * The minimum allowable delta to use for the scan limit 335 */ 336 private final long minimumScanTimeLimitDelta; 337 338 /** 339 * Row size threshold for multi requests above which a warning is logged 340 */ 341 private final int rowSizeWarnThreshold; 342 343 final AtomicBoolean clearCompactionQueues = new AtomicBoolean(false); 344 345 // We want to vet all accesses at the point of entry itself; limiting scope of access checker 346 // instance to only this class to prevent its use from spreading deeper into implementation. 347 // Initialized in start() since AccessChecker needs ZKWatcher which is created by HRegionServer 348 // after RSRpcServices constructor and before start() is called. 349 // Initialized only if authorization is enabled, else remains null. 350 protected AccessChecker accessChecker; 351 352 /** 353 * Services launched in RSRpcServices. By default they are on but you can use the below 354 * booleans to selectively enable/disable either Admin or Client Service (Rare is the case 355 * where you would ever turn off one or the other). 356 */ 357 public static final String REGIONSERVER_ADMIN_SERVICE_CONFIG = 358 "hbase.regionserver.admin.executorService"; 359 public static final String REGIONSERVER_CLIENT_SERVICE_CONFIG = 360 "hbase.regionserver.client.executorService"; 361 362 /** 363 * An Rpc callback for closing a RegionScanner. 364 */ 365 private static final class RegionScannerCloseCallBack implements RpcCallback { 366 367 private final RegionScanner scanner; 368 369 public RegionScannerCloseCallBack(RegionScanner scanner) { 370 this.scanner = scanner; 371 } 372 373 @Override 374 public void run() throws IOException { 375 this.scanner.close(); 376 } 377 } 378 379 /** 380 * An Rpc callback for doing shipped() call on a RegionScanner. 381 */ 382 private class RegionScannerShippedCallBack implements RpcCallback { 383 384 private final String scannerName; 385 private final Shipper shipper; 386 private final Lease lease; 387 388 public RegionScannerShippedCallBack(String scannerName, Shipper shipper, Lease lease) { 389 this.scannerName = scannerName; 390 this.shipper = shipper; 391 this.lease = lease; 392 } 393 394 @Override 395 public void run() throws IOException { 396 this.shipper.shipped(); 397 // We're done. On way out re-add the above removed lease. The lease was temp removed for this 398 // Rpc call and we are at end of the call now. Time to add it back. 399 if (scanners.containsKey(scannerName)) { 400 if (lease != null) regionServer.leases.addLease(lease); 401 } 402 } 403 } 404 405 /** 406 * An RpcCallBack that creates a list of scanners that needs to perform callBack operation on 407 * completion of multiGets. 408 */ 409 static class RegionScannersCloseCallBack implements RpcCallback { 410 private final List<RegionScanner> scanners = new ArrayList<>(); 411 412 public void addScanner(RegionScanner scanner) { 413 this.scanners.add(scanner); 414 } 415 416 @Override 417 public void run() { 418 for (RegionScanner scanner : scanners) { 419 try { 420 scanner.close(); 421 } catch (IOException e) { 422 LOG.error("Exception while closing the scanner " + scanner, e); 423 } 424 } 425 } 426 } 427 428 /** 429 * Holder class which holds the RegionScanner, nextCallSeq and RpcCallbacks together. 430 */ 431 private static final class RegionScannerHolder { 432 433 private final AtomicLong nextCallSeq = new AtomicLong(0); 434 private final String scannerName; 435 private final RegionScanner s; 436 private final HRegion r; 437 private final RpcCallback closeCallBack; 438 private final RpcCallback shippedCallback; 439 private byte[] rowOfLastPartialResult; 440 private boolean needCursor; 441 442 public RegionScannerHolder(String scannerName, RegionScanner s, HRegion r, 443 RpcCallback closeCallBack, RpcCallback shippedCallback, boolean needCursor) { 444 this.scannerName = scannerName; 445 this.s = s; 446 this.r = r; 447 this.closeCallBack = closeCallBack; 448 this.shippedCallback = shippedCallback; 449 this.needCursor = needCursor; 450 } 451 452 public long getNextCallSeq() { 453 return nextCallSeq.get(); 454 } 455 456 public boolean incNextCallSeq(long currentSeq) { 457 // Use CAS to prevent multiple scan request running on the same scanner. 458 return nextCallSeq.compareAndSet(currentSeq, currentSeq + 1); 459 } 460 } 461 462 /** 463 * Instantiated as a scanner lease. If the lease times out, the scanner is 464 * closed 465 */ 466 private class ScannerListener implements LeaseListener { 467 private final String scannerName; 468 469 ScannerListener(final String n) { 470 this.scannerName = n; 471 } 472 473 @Override 474 public void leaseExpired() { 475 RegionScannerHolder rsh = scanners.remove(this.scannerName); 476 if (rsh != null) { 477 RegionScanner s = rsh.s; 478 LOG.info("Scanner " + this.scannerName + " lease expired on region " 479 + s.getRegionInfo().getRegionNameAsString()); 480 HRegion region = null; 481 try { 482 region = regionServer.getRegion(s.getRegionInfo().getRegionName()); 483 if (region != null && region.getCoprocessorHost() != null) { 484 region.getCoprocessorHost().preScannerClose(s); 485 } 486 } catch (IOException e) { 487 LOG.error("Closing scanner for " + s.getRegionInfo().getRegionNameAsString(), e); 488 } finally { 489 try { 490 s.close(); 491 if (region != null && region.getCoprocessorHost() != null) { 492 region.getCoprocessorHost().postScannerClose(s); 493 } 494 } catch (IOException e) { 495 LOG.error("Closing scanner for " + s.getRegionInfo().getRegionNameAsString(), e); 496 } 497 } 498 } else { 499 LOG.warn("Scanner " + this.scannerName + " lease expired, but no related" + 500 " scanner found, hence no chance to close that related scanner!"); 501 } 502 } 503 } 504 505 private static ResultOrException getResultOrException(final ClientProtos.Result r, 506 final int index){ 507 return getResultOrException(ResponseConverter.buildActionResult(r), index); 508 } 509 510 private static ResultOrException getResultOrException(final Exception e, final int index) { 511 return getResultOrException(ResponseConverter.buildActionResult(e), index); 512 } 513 514 private static ResultOrException getResultOrException( 515 final ResultOrException.Builder builder, final int index) { 516 return builder.setIndex(index).build(); 517 } 518 519 /** 520 * Checks for the following pre-checks in order: 521 * <ol> 522 * <li>RegionServer is running</li> 523 * <li>If authorization is enabled, then RPC caller has ADMIN permissions</li> 524 * </ol> 525 * @param requestName name of rpc request. Used in reporting failures to provide context. 526 * @throws ServiceException If any of the above listed pre-check fails. 527 */ 528 private void rpcPreCheck(String requestName) throws ServiceException { 529 try { 530 checkOpen(); 531 requirePermission(requestName, Permission.Action.ADMIN); 532 } catch (IOException ioe) { 533 throw new ServiceException(ioe); 534 } 535 } 536 537 /** 538 * Starts the nonce operation for a mutation, if needed. 539 * @param mutation Mutation. 540 * @param nonceGroup Nonce group from the request. 541 * @return whether to proceed this mutation. 542 */ 543 private boolean startNonceOperation(final MutationProto mutation, long nonceGroup) 544 throws IOException { 545 if (regionServer.nonceManager == null || !mutation.hasNonce()) return true; 546 boolean canProceed = false; 547 try { 548 canProceed = regionServer.nonceManager.startOperation( 549 nonceGroup, mutation.getNonce(), regionServer); 550 } catch (InterruptedException ex) { 551 throw new InterruptedIOException("Nonce start operation interrupted"); 552 } 553 return canProceed; 554 } 555 556 /** 557 * Ends nonce operation for a mutation, if needed. 558 * @param mutation Mutation. 559 * @param nonceGroup Nonce group from the request. Always 0 in initial implementation. 560 * @param success Whether the operation for this nonce has succeeded. 561 */ 562 private void endNonceOperation(final MutationProto mutation, 563 long nonceGroup, boolean success) { 564 if (regionServer.nonceManager != null && mutation.hasNonce()) { 565 regionServer.nonceManager.endOperation(nonceGroup, mutation.getNonce(), success); 566 } 567 } 568 569 private boolean isClientCellBlockSupport(RpcCallContext context) { 570 return context != null && context.isClientCellBlockSupported(); 571 } 572 573 private void addResult(final MutateResponse.Builder builder, final Result result, 574 final HBaseRpcController rpcc, boolean clientCellBlockSupported) { 575 if (result == null) return; 576 if (clientCellBlockSupported) { 577 builder.setResult(ProtobufUtil.toResultNoData(result)); 578 rpcc.setCellScanner(result.cellScanner()); 579 } else { 580 ClientProtos.Result pbr = ProtobufUtil.toResult(result); 581 builder.setResult(pbr); 582 } 583 } 584 585 private void addResults(ScanResponse.Builder builder, List<Result> results, 586 HBaseRpcController controller, boolean isDefaultRegion, boolean clientCellBlockSupported) { 587 builder.setStale(!isDefaultRegion); 588 if (results.isEmpty()) { 589 return; 590 } 591 if (clientCellBlockSupported) { 592 for (Result res : results) { 593 builder.addCellsPerResult(res.size()); 594 builder.addPartialFlagPerResult(res.mayHaveMoreCellsInRow()); 595 } 596 controller.setCellScanner(CellUtil.createCellScanner(results)); 597 } else { 598 for (Result res : results) { 599 ClientProtos.Result pbr = ProtobufUtil.toResult(res); 600 builder.addResults(pbr); 601 } 602 } 603 } 604 605 /** 606 * Mutate a list of rows atomically. 607 * @param cellScanner if non-null, the mutation data -- the Cell content. 608 */ 609 private boolean checkAndRowMutate(final HRegion region, final List<ClientProtos.Action> actions, 610 final CellScanner cellScanner, byte[] row, byte[] family, byte[] qualifier, CompareOperator op, 611 ByteArrayComparable comparator, TimeRange timeRange, RegionActionResult.Builder builder, 612 ActivePolicyEnforcement spaceQuotaEnforcement) throws IOException { 613 int countOfCompleteMutation = 0; 614 try { 615 if (!region.getRegionInfo().isMetaRegion()) { 616 regionServer.cacheFlusher.reclaimMemStoreMemory(); 617 } 618 RowMutations rm = null; 619 int i = 0; 620 ClientProtos.ResultOrException.Builder resultOrExceptionOrBuilder = 621 ClientProtos.ResultOrException.newBuilder(); 622 for (ClientProtos.Action action: actions) { 623 if (action.hasGet()) { 624 throw new DoNotRetryIOException("Atomic put and/or delete only, not a Get=" + 625 action.getGet()); 626 } 627 MutationType type = action.getMutation().getMutateType(); 628 if (rm == null) { 629 rm = new RowMutations(action.getMutation().getRow().toByteArray(), actions.size()); 630 } 631 switch (type) { 632 case PUT: 633 Put put = ProtobufUtil.toPut(action.getMutation(), cellScanner); 634 ++countOfCompleteMutation; 635 checkCellSizeLimit(region, put); 636 spaceQuotaEnforcement.getPolicyEnforcement(region).check(put); 637 rm.add(put); 638 break; 639 case DELETE: 640 Delete del = ProtobufUtil.toDelete(action.getMutation(), cellScanner); 641 ++countOfCompleteMutation; 642 spaceQuotaEnforcement.getPolicyEnforcement(region).check(del); 643 rm.add(del); 644 break; 645 default: 646 throw new DoNotRetryIOException("Atomic put and/or delete only, not " + type.name()); 647 } 648 // To unify the response format with doNonAtomicRegionMutation and read through client's 649 // AsyncProcess we have to add an empty result instance per operation 650 resultOrExceptionOrBuilder.clear(); 651 resultOrExceptionOrBuilder.setIndex(i++); 652 builder.addResultOrException( 653 resultOrExceptionOrBuilder.build()); 654 } 655 return region.checkAndRowMutate(row, family, qualifier, op, comparator, timeRange, rm); 656 } finally { 657 // Currently, the checkAndMutate isn't supported by batch so it won't mess up the cell scanner 658 // even if the malformed cells are not skipped. 659 for (int i = countOfCompleteMutation; i < actions.size(); ++i) { 660 skipCellsForMutation(actions.get(i), cellScanner); 661 } 662 } 663 } 664 665 /** 666 * Execute an append mutation. 667 * 668 * @return result to return to client if default operation should be 669 * bypassed as indicated by RegionObserver, null otherwise 670 */ 671 private Result append(final HRegion region, final OperationQuota quota, 672 final MutationProto mutation, final CellScanner cellScanner, long nonceGroup, 673 ActivePolicyEnforcement spaceQuota) 674 throws IOException { 675 long before = EnvironmentEdgeManager.currentTime(); 676 Append append = ProtobufUtil.toAppend(mutation, cellScanner); 677 checkCellSizeLimit(region, append); 678 spaceQuota.getPolicyEnforcement(region).check(append); 679 quota.addMutation(append); 680 Result r = null; 681 if (region.getCoprocessorHost() != null) { 682 r = region.getCoprocessorHost().preAppend(append); 683 } 684 if (r == null) { 685 boolean canProceed = startNonceOperation(mutation, nonceGroup); 686 boolean success = false; 687 try { 688 long nonce = mutation.hasNonce() ? mutation.getNonce() : HConstants.NO_NONCE; 689 if (canProceed) { 690 r = region.append(append, nonceGroup, nonce); 691 } else { 692 // convert duplicate append to get 693 List<Cell> results = region.get(ProtobufUtil.toGet(mutation, cellScanner), false, 694 nonceGroup, nonce); 695 r = Result.create(results); 696 } 697 success = true; 698 } finally { 699 if (canProceed) { 700 endNonceOperation(mutation, nonceGroup, success); 701 } 702 } 703 if (region.getCoprocessorHost() != null) { 704 r = region.getCoprocessorHost().postAppend(append, r); 705 } 706 } 707 if (regionServer.metricsRegionServer != null) { 708 regionServer.metricsRegionServer.updateAppend( 709 region.getTableDescriptor().getTableName(), 710 EnvironmentEdgeManager.currentTime() - before); 711 } 712 return r == null ? Result.EMPTY_RESULT : r; 713 } 714 715 /** 716 * Execute an increment mutation. 717 * 718 * @param region 719 * @param mutation 720 * @return the Result 721 * @throws IOException 722 */ 723 private Result increment(final HRegion region, final OperationQuota quota, 724 final MutationProto mutation, final CellScanner cells, long nonceGroup, 725 ActivePolicyEnforcement spaceQuota) 726 throws IOException { 727 long before = EnvironmentEdgeManager.currentTime(); 728 Increment increment = ProtobufUtil.toIncrement(mutation, cells); 729 checkCellSizeLimit(region, increment); 730 spaceQuota.getPolicyEnforcement(region).check(increment); 731 quota.addMutation(increment); 732 Result r = null; 733 if (region.getCoprocessorHost() != null) { 734 r = region.getCoprocessorHost().preIncrement(increment); 735 } 736 if (r == null) { 737 boolean canProceed = startNonceOperation(mutation, nonceGroup); 738 boolean success = false; 739 try { 740 long nonce = mutation.hasNonce() ? mutation.getNonce() : HConstants.NO_NONCE; 741 if (canProceed) { 742 r = region.increment(increment, nonceGroup, nonce); 743 } else { 744 // convert duplicate increment to get 745 List<Cell> results = region.get(ProtobufUtil.toGet(mutation, cells), false, nonceGroup, 746 nonce); 747 r = Result.create(results); 748 } 749 success = true; 750 } finally { 751 if (canProceed) { 752 endNonceOperation(mutation, nonceGroup, success); 753 } 754 } 755 if (region.getCoprocessorHost() != null) { 756 r = region.getCoprocessorHost().postIncrement(increment, r); 757 } 758 } 759 if (regionServer.metricsRegionServer != null) { 760 regionServer.metricsRegionServer.updateIncrement( 761 region.getTableDescriptor().getTableName(), 762 EnvironmentEdgeManager.currentTime() - before); 763 } 764 return r == null ? Result.EMPTY_RESULT : r; 765 } 766 767 /** 768 * Run through the regionMutation <code>rm</code> and per Mutation, do the work, and then when 769 * done, add an instance of a {@link ResultOrException} that corresponds to each Mutation. 770 * @param cellsToReturn Could be null. May be allocated in this method. This is what this 771 * method returns as a 'result'. 772 * @param closeCallBack the callback to be used with multigets 773 * @param context the current RpcCallContext 774 * @return Return the <code>cellScanner</code> passed 775 */ 776 private List<CellScannable> doNonAtomicRegionMutation(final HRegion region, 777 final OperationQuota quota, final RegionAction actions, final CellScanner cellScanner, 778 final RegionActionResult.Builder builder, List<CellScannable> cellsToReturn, long nonceGroup, 779 final RegionScannersCloseCallBack closeCallBack, RpcCallContext context, 780 ActivePolicyEnforcement spaceQuotaEnforcement) { 781 // Gather up CONTIGUOUS Puts and Deletes in this mutations List. Idea is that rather than do 782 // one at a time, we instead pass them in batch. Be aware that the corresponding 783 // ResultOrException instance that matches each Put or Delete is then added down in the 784 // doNonAtomicBatchOp call. We should be staying aligned though the Put and Delete are 785 // deferred/batched 786 List<ClientProtos.Action> mutations = null; 787 long maxQuotaResultSize = Math.min(maxScannerResultSize, quota.getReadAvailable()); 788 IOException sizeIOE = null; 789 Object lastBlock = null; 790 ClientProtos.ResultOrException.Builder resultOrExceptionBuilder = ResultOrException.newBuilder(); 791 boolean hasResultOrException = false; 792 for (ClientProtos.Action action : actions.getActionList()) { 793 hasResultOrException = false; 794 resultOrExceptionBuilder.clear(); 795 try { 796 Result r = null; 797 798 if (context != null 799 && context.isRetryImmediatelySupported() 800 && (context.getResponseCellSize() > maxQuotaResultSize 801 || context.getResponseBlockSize() + context.getResponseExceptionSize() 802 > maxQuotaResultSize)) { 803 804 // We're storing the exception since the exception and reason string won't 805 // change after the response size limit is reached. 806 if (sizeIOE == null ) { 807 // We don't need the stack un-winding do don't throw the exception. 808 // Throwing will kill the JVM's JIT. 809 // 810 // Instead just create the exception and then store it. 811 sizeIOE = new MultiActionResultTooLarge("Max size exceeded" 812 + " CellSize: " + context.getResponseCellSize() 813 + " BlockSize: " + context.getResponseBlockSize()); 814 815 // Only report the exception once since there's only one request that 816 // caused the exception. Otherwise this number will dominate the exceptions count. 817 rpcServer.getMetrics().exception(sizeIOE); 818 } 819 820 // Now that there's an exception is known to be created 821 // use it for the response. 822 // 823 // This will create a copy in the builder. 824 NameBytesPair pair = ResponseConverter.buildException(sizeIOE); 825 resultOrExceptionBuilder.setException(pair); 826 context.incrementResponseExceptionSize(pair.getSerializedSize()); 827 resultOrExceptionBuilder.setIndex(action.getIndex()); 828 builder.addResultOrException(resultOrExceptionBuilder.build()); 829 skipCellsForMutation(action, cellScanner); 830 continue; 831 } 832 if (action.hasGet()) { 833 long before = EnvironmentEdgeManager.currentTime(); 834 ClientProtos.Get pbGet = action.getGet(); 835 // An asynchbase client, https://github.com/OpenTSDB/asynchbase, starts by trying to do 836 // a get closest before. Throwing the UnknownProtocolException signals it that it needs 837 // to switch and do hbase2 protocol (HBase servers do not tell clients what versions 838 // they are; its a problem for non-native clients like asynchbase. HBASE-20225. 839 if (pbGet.hasClosestRowBefore() && pbGet.getClosestRowBefore()) { 840 throw new UnknownProtocolException("Is this a pre-hbase-1.0.0 or asynchbase client? " + 841 "Client is invoking getClosestRowBefore removed in hbase-2.0.0 replaced by " + 842 "reverse Scan."); 843 } 844 try { 845 Get get = ProtobufUtil.toGet(pbGet); 846 if (context != null) { 847 r = get(get, (region), closeCallBack, context); 848 } else { 849 r = region.get(get); 850 } 851 } finally { 852 if (regionServer.metricsRegionServer != null) { 853 regionServer.metricsRegionServer.updateGet( 854 region.getTableDescriptor().getTableName(), 855 EnvironmentEdgeManager.currentTime() - before); 856 } 857 } 858 } else if (action.hasServiceCall()) { 859 hasResultOrException = true; 860 com.google.protobuf.Message result = 861 execServiceOnRegion(region, action.getServiceCall()); 862 ClientProtos.CoprocessorServiceResult.Builder serviceResultBuilder = 863 ClientProtos.CoprocessorServiceResult.newBuilder(); 864 resultOrExceptionBuilder.setServiceResult( 865 serviceResultBuilder.setValue( 866 serviceResultBuilder.getValueBuilder() 867 .setName(result.getClass().getName()) 868 // TODO: Copy!!! 869 .setValue(UnsafeByteOperations.unsafeWrap(result.toByteArray())))); 870 } else if (action.hasMutation()) { 871 MutationType type = action.getMutation().getMutateType(); 872 if (type != MutationType.PUT && type != MutationType.DELETE && mutations != null && 873 !mutations.isEmpty()) { 874 // Flush out any Puts or Deletes already collected. 875 doNonAtomicBatchOp(builder, region, quota, mutations, cellScanner, 876 spaceQuotaEnforcement); 877 mutations.clear(); 878 } 879 switch (type) { 880 case APPEND: 881 r = append(region, quota, action.getMutation(), cellScanner, nonceGroup, 882 spaceQuotaEnforcement); 883 break; 884 case INCREMENT: 885 r = increment(region, quota, action.getMutation(), cellScanner, nonceGroup, 886 spaceQuotaEnforcement); 887 break; 888 case PUT: 889 case DELETE: 890 // Collect the individual mutations and apply in a batch 891 if (mutations == null) { 892 mutations = new ArrayList<>(actions.getActionCount()); 893 } 894 mutations.add(action); 895 break; 896 default: 897 throw new DoNotRetryIOException("Unsupported mutate type: " + type.name()); 898 } 899 } else { 900 throw new HBaseIOException("Unexpected Action type"); 901 } 902 if (r != null) { 903 ClientProtos.Result pbResult = null; 904 if (isClientCellBlockSupport(context)) { 905 pbResult = ProtobufUtil.toResultNoData(r); 906 // Hard to guess the size here. Just make a rough guess. 907 if (cellsToReturn == null) { 908 cellsToReturn = new ArrayList<>(); 909 } 910 cellsToReturn.add(r); 911 } else { 912 pbResult = ProtobufUtil.toResult(r); 913 } 914 lastBlock = addSize(context, r, lastBlock); 915 hasResultOrException = true; 916 resultOrExceptionBuilder.setResult(pbResult); 917 } 918 // Could get to here and there was no result and no exception. Presumes we added 919 // a Put or Delete to the collecting Mutations List for adding later. In this 920 // case the corresponding ResultOrException instance for the Put or Delete will be added 921 // down in the doNonAtomicBatchOp method call rather than up here. 922 } catch (IOException ie) { 923 rpcServer.getMetrics().exception(ie); 924 hasResultOrException = true; 925 NameBytesPair pair = ResponseConverter.buildException(ie); 926 resultOrExceptionBuilder.setException(pair); 927 context.incrementResponseExceptionSize(pair.getSerializedSize()); 928 } 929 if (hasResultOrException) { 930 // Propagate index. 931 resultOrExceptionBuilder.setIndex(action.getIndex()); 932 builder.addResultOrException(resultOrExceptionBuilder.build()); 933 } 934 } 935 // Finish up any outstanding mutations 936 if (!CollectionUtils.isEmpty(mutations)) { 937 doNonAtomicBatchOp(builder, region, quota, mutations, cellScanner, spaceQuotaEnforcement); 938 } 939 return cellsToReturn; 940 } 941 942 private void checkCellSizeLimit(final HRegion r, final Mutation m) throws IOException { 943 if (r.maxCellSize > 0) { 944 CellScanner cells = m.cellScanner(); 945 while (cells.advance()) { 946 int size = PrivateCellUtil.estimatedSerializedSizeOf(cells.current()); 947 if (size > r.maxCellSize) { 948 String msg = "Cell with size " + size + " exceeds limit of " + r.maxCellSize + " bytes"; 949 if (LOG.isDebugEnabled()) { 950 LOG.debug(msg); 951 } 952 throw new DoNotRetryIOException(msg); 953 } 954 } 955 } 956 } 957 958 private void doAtomicBatchOp(final RegionActionResult.Builder builder, final HRegion region, 959 final OperationQuota quota, final List<ClientProtos.Action> mutations, 960 final CellScanner cells, ActivePolicyEnforcement spaceQuotaEnforcement) 961 throws IOException { 962 // Just throw the exception. The exception will be caught and then added to region-level 963 // exception for RegionAction. Leaving the null to action result is ok since the null 964 // result is viewed as failure by hbase client. And the region-lever exception will be used 965 // to replaced the null result. see AsyncRequestFutureImpl#receiveMultiAction and 966 // AsyncBatchRpcRetryingCaller#onComplete for more details. 967 doBatchOp(builder, region, quota, mutations, cells, spaceQuotaEnforcement, true); 968 } 969 970 private void doNonAtomicBatchOp(final RegionActionResult.Builder builder, final HRegion region, 971 final OperationQuota quota, final List<ClientProtos.Action> mutations, 972 final CellScanner cells, ActivePolicyEnforcement spaceQuotaEnforcement) { 973 try { 974 doBatchOp(builder, region, quota, mutations, cells, spaceQuotaEnforcement, false); 975 } catch (IOException e) { 976 // Set the exception for each action. The mutations in same RegionAction are group to 977 // different batch and then be processed individually. Hence, we don't set the region-level 978 // exception here for whole RegionAction. 979 for (Action mutation : mutations) { 980 builder.addResultOrException(getResultOrException(e, mutation.getIndex())); 981 } 982 } 983 } 984 985 /** 986 * Execute a list of Put/Delete mutations. 987 * 988 * @param builder 989 * @param region 990 * @param mutations 991 */ 992 private void doBatchOp(final RegionActionResult.Builder builder, final HRegion region, 993 final OperationQuota quota, final List<ClientProtos.Action> mutations, 994 final CellScanner cells, ActivePolicyEnforcement spaceQuotaEnforcement, boolean atomic) 995 throws IOException { 996 Mutation[] mArray = new Mutation[mutations.size()]; 997 long before = EnvironmentEdgeManager.currentTime(); 998 boolean batchContainsPuts = false, batchContainsDelete = false; 999 try { 1000 /** HBASE-17924 1001 * mutationActionMap is a map to map the relation between mutations and actions 1002 * since mutation array may have been reoredered.In order to return the right 1003 * result or exception to the corresponding actions, We need to know which action 1004 * is the mutation belong to. We can't sort ClientProtos.Action array, since they 1005 * are bonded to cellscanners. 1006 */ 1007 Map<Mutation, ClientProtos.Action> mutationActionMap = new HashMap<>(); 1008 int i = 0; 1009 for (ClientProtos.Action action: mutations) { 1010 if (action.hasGet()) { 1011 throw new DoNotRetryIOException("Atomic put and/or delete only, not a Get=" + 1012 action.getGet()); 1013 } 1014 MutationProto m = action.getMutation(); 1015 Mutation mutation; 1016 if (m.getMutateType() == MutationType.PUT) { 1017 mutation = ProtobufUtil.toPut(m, cells); 1018 batchContainsPuts = true; 1019 } else { 1020 mutation = ProtobufUtil.toDelete(m, cells); 1021 batchContainsDelete = true; 1022 } 1023 mutationActionMap.put(mutation, action); 1024 mArray[i++] = mutation; 1025 checkCellSizeLimit(region, mutation); 1026 // Check if a space quota disallows this mutation 1027 spaceQuotaEnforcement.getPolicyEnforcement(region).check(mutation); 1028 quota.addMutation(mutation); 1029 } 1030 1031 if (!region.getRegionInfo().isMetaRegion()) { 1032 regionServer.cacheFlusher.reclaimMemStoreMemory(); 1033 } 1034 1035 // HBASE-17924 1036 // Sort to improve lock efficiency for non-atomic batch of operations. If atomic 1037 // order is preserved as its expected from the client 1038 if (!atomic) { 1039 Arrays.sort(mArray, (v1, v2) -> Row.COMPARATOR.compare(v1, v2)); 1040 } 1041 1042 OperationStatus[] codes = region.batchMutate(mArray, atomic, HConstants.NO_NONCE, 1043 HConstants.NO_NONCE); 1044 for (i = 0; i < codes.length; i++) { 1045 Mutation currentMutation = mArray[i]; 1046 ClientProtos.Action currentAction = mutationActionMap.get(currentMutation); 1047 int index = currentAction.hasIndex() || !atomic ? currentAction.getIndex() : i; 1048 Exception e = null; 1049 switch (codes[i].getOperationStatusCode()) { 1050 case BAD_FAMILY: 1051 e = new NoSuchColumnFamilyException(codes[i].getExceptionMsg()); 1052 builder.addResultOrException(getResultOrException(e, index)); 1053 break; 1054 1055 case SANITY_CHECK_FAILURE: 1056 e = new FailedSanityCheckException(codes[i].getExceptionMsg()); 1057 builder.addResultOrException(getResultOrException(e, index)); 1058 break; 1059 1060 default: 1061 e = new DoNotRetryIOException(codes[i].getExceptionMsg()); 1062 builder.addResultOrException(getResultOrException(e, index)); 1063 break; 1064 1065 case SUCCESS: 1066 builder.addResultOrException(getResultOrException( 1067 ClientProtos.Result.getDefaultInstance(), index)); 1068 break; 1069 1070 case STORE_TOO_BUSY: 1071 e = new RegionTooBusyException(codes[i].getExceptionMsg()); 1072 builder.addResultOrException(getResultOrException(e, index)); 1073 break; 1074 } 1075 } 1076 } finally { 1077 int processedMutationIndex = 0; 1078 for (Action mutation : mutations) { 1079 // The non-null mArray[i] means the cell scanner has been read. 1080 if (mArray[processedMutationIndex++] == null) { 1081 skipCellsForMutation(mutation, cells); 1082 } 1083 } 1084 updateMutationMetrics(region, before, batchContainsPuts, batchContainsDelete); 1085 } 1086 } 1087 1088 private void updateMutationMetrics(HRegion region, long starttime, boolean batchContainsPuts, 1089 boolean batchContainsDelete) { 1090 if (regionServer.metricsRegionServer != null) { 1091 long after = EnvironmentEdgeManager.currentTime(); 1092 if (batchContainsPuts) { 1093 regionServer.metricsRegionServer 1094 .updatePutBatch(region.getTableDescriptor().getTableName(), after - starttime); 1095 } 1096 if (batchContainsDelete) { 1097 regionServer.metricsRegionServer 1098 .updateDeleteBatch(region.getTableDescriptor().getTableName(), after - starttime); 1099 } 1100 } 1101 } 1102 1103 /** 1104 * Execute a list of Put/Delete mutations. The function returns OperationStatus instead of 1105 * constructing MultiResponse to save a possible loop if caller doesn't need MultiResponse. 1106 * @param region 1107 * @param mutations 1108 * @param replaySeqId 1109 * @return an array of OperationStatus which internally contains the OperationStatusCode and the 1110 * exceptionMessage if any 1111 * @throws IOException 1112 */ 1113 private OperationStatus [] doReplayBatchOp(final HRegion region, 1114 final List<MutationReplay> mutations, long replaySeqId) throws IOException { 1115 long before = EnvironmentEdgeManager.currentTime(); 1116 boolean batchContainsPuts = false, batchContainsDelete = false; 1117 try { 1118 for (Iterator<MutationReplay> it = mutations.iterator(); it.hasNext();) { 1119 MutationReplay m = it.next(); 1120 1121 if (m.getType() == MutationType.PUT) { 1122 batchContainsPuts = true; 1123 } else { 1124 batchContainsDelete = true; 1125 } 1126 1127 NavigableMap<byte[], List<Cell>> map = m.mutation.getFamilyCellMap(); 1128 List<Cell> metaCells = map.get(WALEdit.METAFAMILY); 1129 if (metaCells != null && !metaCells.isEmpty()) { 1130 for (Cell metaCell : metaCells) { 1131 CompactionDescriptor compactionDesc = WALEdit.getCompaction(metaCell); 1132 boolean isDefaultReplica = RegionReplicaUtil.isDefaultReplica(region.getRegionInfo()); 1133 HRegion hRegion = region; 1134 if (compactionDesc != null) { 1135 // replay the compaction. Remove the files from stores only if we are the primary 1136 // region replica (thus own the files) 1137 hRegion.replayWALCompactionMarker(compactionDesc, !isDefaultReplica, isDefaultReplica, 1138 replaySeqId); 1139 continue; 1140 } 1141 FlushDescriptor flushDesc = WALEdit.getFlushDescriptor(metaCell); 1142 if (flushDesc != null && !isDefaultReplica) { 1143 hRegion.replayWALFlushMarker(flushDesc, replaySeqId); 1144 continue; 1145 } 1146 RegionEventDescriptor regionEvent = WALEdit.getRegionEventDescriptor(metaCell); 1147 if (regionEvent != null && !isDefaultReplica) { 1148 hRegion.replayWALRegionEventMarker(regionEvent); 1149 continue; 1150 } 1151 BulkLoadDescriptor bulkLoadEvent = WALEdit.getBulkLoadDescriptor(metaCell); 1152 if (bulkLoadEvent != null) { 1153 hRegion.replayWALBulkLoadEventMarker(bulkLoadEvent); 1154 continue; 1155 } 1156 } 1157 it.remove(); 1158 } 1159 } 1160 requestCount.increment(); 1161 if (!region.getRegionInfo().isMetaRegion()) { 1162 regionServer.cacheFlusher.reclaimMemStoreMemory(); 1163 } 1164 return region.batchReplay(mutations.toArray( 1165 new MutationReplay[mutations.size()]), replaySeqId); 1166 } finally { 1167 updateMutationMetrics(region, before, batchContainsPuts, batchContainsDelete); 1168 } 1169 } 1170 1171 private void closeAllScanners() { 1172 // Close any outstanding scanners. Means they'll get an UnknownScanner 1173 // exception next time they come in. 1174 for (Map.Entry<String, RegionScannerHolder> e : scanners.entrySet()) { 1175 try { 1176 e.getValue().s.close(); 1177 } catch (IOException ioe) { 1178 LOG.warn("Closing scanner " + e.getKey(), ioe); 1179 } 1180 } 1181 } 1182 1183 // Exposed for testing 1184 interface LogDelegate { 1185 void logBatchWarning(String firstRegionName, int sum, int rowSizeWarnThreshold); 1186 } 1187 1188 private static LogDelegate DEFAULT_LOG_DELEGATE = new LogDelegate() { 1189 @Override 1190 public void logBatchWarning(String firstRegionName, int sum, int rowSizeWarnThreshold) { 1191 if (LOG.isWarnEnabled()) { 1192 LOG.warn("Large batch operation detected (greater than " + rowSizeWarnThreshold 1193 + ") (HBASE-18023)." + " Requested Number of Rows: " + sum + " Client: " 1194 + RpcServer.getRequestUserName().orElse(null) + "/" 1195 + RpcServer.getRemoteAddress().orElse(null) 1196 + " first region in multi=" + firstRegionName); 1197 } 1198 } 1199 }; 1200 1201 private final LogDelegate ld; 1202 1203 public RSRpcServices(HRegionServer rs) throws IOException { 1204 this(rs, DEFAULT_LOG_DELEGATE); 1205 } 1206 1207 // Directly invoked only for testing 1208 RSRpcServices(HRegionServer rs, LogDelegate ld) throws IOException { 1209 this.ld = ld; 1210 regionServer = rs; 1211 rowSizeWarnThreshold = rs.conf.getInt(BATCH_ROWS_THRESHOLD_NAME, BATCH_ROWS_THRESHOLD_DEFAULT); 1212 RpcSchedulerFactory rpcSchedulerFactory; 1213 try { 1214 rpcSchedulerFactory = getRpcSchedulerFactoryClass().asSubclass(RpcSchedulerFactory.class) 1215 .getDeclaredConstructor().newInstance(); 1216 } catch (NoSuchMethodException | InvocationTargetException | 1217 InstantiationException | IllegalAccessException e) { 1218 throw new IllegalArgumentException(e); 1219 } 1220 // Server to handle client requests. 1221 InetSocketAddress initialIsa; 1222 InetSocketAddress bindAddress; 1223 if(this instanceof MasterRpcServices) { 1224 String hostname = getHostname(rs.conf, true); 1225 int port = rs.conf.getInt(HConstants.MASTER_PORT, HConstants.DEFAULT_MASTER_PORT); 1226 // Creation of a HSA will force a resolve. 1227 initialIsa = new InetSocketAddress(hostname, port); 1228 bindAddress = new InetSocketAddress(rs.conf.get("hbase.master.ipc.address", hostname), port); 1229 } else { 1230 String hostname = getHostname(rs.conf, false); 1231 int port = rs.conf.getInt(HConstants.REGIONSERVER_PORT, 1232 HConstants.DEFAULT_REGIONSERVER_PORT); 1233 // Creation of a HSA will force a resolve. 1234 initialIsa = new InetSocketAddress(hostname, port); 1235 bindAddress = new InetSocketAddress( 1236 rs.conf.get("hbase.regionserver.ipc.address", hostname), port); 1237 } 1238 if (initialIsa.getAddress() == null) { 1239 throw new IllegalArgumentException("Failed resolve of " + initialIsa); 1240 } 1241 priority = createPriority(); 1242 // Using Address means we don't get the IP too. Shorten it more even to just the host name 1243 // w/o the domain. 1244 String name = rs.getProcessName() + "/" + 1245 Address.fromParts(initialIsa.getHostName(), initialIsa.getPort()).toStringWithoutDomain(); 1246 // Set how many times to retry talking to another server over Connection. 1247 ConnectionUtils.setServerSideHConnectionRetriesConfig(rs.conf, name, LOG); 1248 rpcServer = createRpcServer(rs, rs.conf, rpcSchedulerFactory, bindAddress, name); 1249 rpcServer.setRsRpcServices(this); 1250 scannerLeaseTimeoutPeriod = rs.conf.getInt( 1251 HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, 1252 HConstants.DEFAULT_HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD); 1253 maxScannerResultSize = rs.conf.getLong( 1254 HConstants.HBASE_SERVER_SCANNER_MAX_RESULT_SIZE_KEY, 1255 HConstants.DEFAULT_HBASE_SERVER_SCANNER_MAX_RESULT_SIZE); 1256 rpcTimeout = rs.conf.getInt( 1257 HConstants.HBASE_RPC_TIMEOUT_KEY, 1258 HConstants.DEFAULT_HBASE_RPC_TIMEOUT); 1259 minimumScanTimeLimitDelta = rs.conf.getLong( 1260 REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA, 1261 DEFAULT_REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA); 1262 1263 InetSocketAddress address = rpcServer.getListenerAddress(); 1264 if (address == null) { 1265 throw new IOException("Listener channel is closed"); 1266 } 1267 // Set our address, however we need the final port that was given to rpcServer 1268 isa = new InetSocketAddress(initialIsa.getHostName(), address.getPort()); 1269 rpcServer.setErrorHandler(this); 1270 rs.setName(name); 1271 1272 closedScanners = CacheBuilder.newBuilder() 1273 .expireAfterAccess(scannerLeaseTimeoutPeriod, TimeUnit.MILLISECONDS).build(); 1274 } 1275 1276 protected RpcServerInterface createRpcServer(Server server, Configuration conf, 1277 RpcSchedulerFactory rpcSchedulerFactory, InetSocketAddress bindAddress, String name) 1278 throws IOException { 1279 boolean reservoirEnabled = conf.getBoolean(RESERVOIR_ENABLED_KEY, true); 1280 try { 1281 return RpcServerFactory.createRpcServer(server, name, getServices(), 1282 bindAddress, // use final bindAddress for this server. 1283 conf, rpcSchedulerFactory.create(conf, this, server), reservoirEnabled); 1284 } catch (BindException be) { 1285 throw new IOException(be.getMessage() + ". To switch ports use the '" 1286 + HConstants.REGIONSERVER_PORT + "' configuration property.", 1287 be.getCause() != null ? be.getCause() : be); 1288 } 1289 } 1290 1291 protected Class<?> getRpcSchedulerFactoryClass() { 1292 return this.regionServer.conf.getClass(REGION_SERVER_RPC_SCHEDULER_FACTORY_CLASS, 1293 SimpleRpcSchedulerFactory.class); 1294 } 1295 1296 @Override 1297 public void onConfigurationChange(Configuration newConf) { 1298 if (rpcServer instanceof ConfigurationObserver) { 1299 ((ConfigurationObserver)rpcServer).onConfigurationChange(newConf); 1300 } 1301 } 1302 1303 protected PriorityFunction createPriority() { 1304 return new AnnotationReadingPriorityFunction(this); 1305 } 1306 1307 protected void requirePermission(String request, Permission.Action perm) throws IOException { 1308 if (accessChecker != null) { 1309 accessChecker.requirePermission(RpcServer.getRequestUser().orElse(null), request, null, perm); 1310 } 1311 } 1312 1313 1314 public static String getHostname(Configuration conf, boolean isMaster) 1315 throws UnknownHostException { 1316 String hostname = conf.get(isMaster? HRegionServer.MASTER_HOSTNAME_KEY : 1317 HRegionServer.RS_HOSTNAME_KEY); 1318 if (hostname == null || hostname.isEmpty()) { 1319 String masterOrRS = isMaster ? "master" : "regionserver"; 1320 return Strings.domainNamePointerToHostName(DNS.getDefaultHost( 1321 conf.get("hbase." + masterOrRS + ".dns.interface", "default"), 1322 conf.get("hbase." + masterOrRS + ".dns.nameserver", "default"))); 1323 } else { 1324 LOG.info("hostname is configured to be " + hostname); 1325 return hostname; 1326 } 1327 } 1328 1329 @VisibleForTesting 1330 public int getScannersCount() { 1331 return scanners.size(); 1332 } 1333 1334 public 1335 RegionScanner getScanner(long scannerId) { 1336 String scannerIdString = Long.toString(scannerId); 1337 RegionScannerHolder scannerHolder = scanners.get(scannerIdString); 1338 if (scannerHolder != null) { 1339 return scannerHolder.s; 1340 } 1341 return null; 1342 } 1343 1344 public String getScanDetailsWithId(long scannerId) { 1345 RegionScanner scanner = getScanner(scannerId); 1346 if (scanner == null) { 1347 return null; 1348 } 1349 StringBuilder builder = new StringBuilder(); 1350 builder.append("table: ").append(scanner.getRegionInfo().getTable().getNameAsString()); 1351 builder.append(" region: ").append(scanner.getRegionInfo().getRegionNameAsString()); 1352 return builder.toString(); 1353 } 1354 1355 /** 1356 * Get the vtime associated with the scanner. 1357 * Currently the vtime is the number of "next" calls. 1358 */ 1359 long getScannerVirtualTime(long scannerId) { 1360 String scannerIdString = Long.toString(scannerId); 1361 RegionScannerHolder scannerHolder = scanners.get(scannerIdString); 1362 if (scannerHolder != null) { 1363 return scannerHolder.getNextCallSeq(); 1364 } 1365 return 0L; 1366 } 1367 1368 /** 1369 * Method to account for the size of retained cells and retained data blocks. 1370 * @param context rpc call context 1371 * @param r result to add size. 1372 * @param lastBlock last block to check whether we need to add the block size in context. 1373 * @return an object that represents the last referenced block from this response. 1374 */ 1375 Object addSize(RpcCallContext context, Result r, Object lastBlock) { 1376 if (context != null && r != null && !r.isEmpty()) { 1377 for (Cell c : r.rawCells()) { 1378 context.incrementResponseCellSize(PrivateCellUtil.estimatedSerializedSizeOf(c)); 1379 1380 // Since byte buffers can point all kinds of crazy places it's harder to keep track 1381 // of which blocks are kept alive by what byte buffer. 1382 // So we make a guess. 1383 if (c instanceof ByteBufferExtendedCell) { 1384 ByteBufferExtendedCell bbCell = (ByteBufferExtendedCell) c; 1385 ByteBuffer bb = bbCell.getValueByteBuffer(); 1386 if (bb != lastBlock) { 1387 context.incrementResponseBlockSize(bb.capacity()); 1388 lastBlock = bb; 1389 } 1390 } else { 1391 // We're using the last block being the same as the current block as 1392 // a proxy for pointing to a new block. This won't be exact. 1393 // If there are multiple gets that bounce back and forth 1394 // Then it's possible that this will over count the size of 1395 // referenced blocks. However it's better to over count and 1396 // use two rpcs than to OOME the regionserver. 1397 byte[] valueArray = c.getValueArray(); 1398 if (valueArray != lastBlock) { 1399 context.incrementResponseBlockSize(valueArray.length); 1400 lastBlock = valueArray; 1401 } 1402 } 1403 1404 } 1405 } 1406 return lastBlock; 1407 } 1408 1409 private RegionScannerHolder addScanner(String scannerName, RegionScanner s, Shipper shipper, 1410 HRegion r, boolean needCursor) throws LeaseStillHeldException { 1411 Lease lease = regionServer.leases.createLease(scannerName, this.scannerLeaseTimeoutPeriod, 1412 new ScannerListener(scannerName)); 1413 RpcCallback shippedCallback = new RegionScannerShippedCallBack(scannerName, shipper, lease); 1414 RpcCallback closeCallback; 1415 if (s instanceof RpcCallback) { 1416 closeCallback = (RpcCallback) s; 1417 } else { 1418 closeCallback = new RegionScannerCloseCallBack(s); 1419 } 1420 RegionScannerHolder rsh = 1421 new RegionScannerHolder(scannerName, s, r, closeCallback, shippedCallback, needCursor); 1422 RegionScannerHolder existing = scanners.putIfAbsent(scannerName, rsh); 1423 assert existing == null : "scannerId must be unique within regionserver's whole lifecycle! " + 1424 scannerName; 1425 return rsh; 1426 } 1427 1428 /** 1429 * Find the HRegion based on a region specifier 1430 * 1431 * @param regionSpecifier the region specifier 1432 * @return the corresponding region 1433 * @throws IOException if the specifier is not null, 1434 * but failed to find the region 1435 */ 1436 @VisibleForTesting 1437 public HRegion getRegion( 1438 final RegionSpecifier regionSpecifier) throws IOException { 1439 return regionServer.getRegion(regionSpecifier.getValue().toByteArray()); 1440 } 1441 1442 /** 1443 * Find the List of HRegions based on a list of region specifiers 1444 * 1445 * @param regionSpecifiers the list of region specifiers 1446 * @return the corresponding list of regions 1447 * @throws IOException if any of the specifiers is not null, 1448 * but failed to find the region 1449 */ 1450 private List<HRegion> getRegions(final List<RegionSpecifier> regionSpecifiers, 1451 final CacheEvictionStatsBuilder stats) { 1452 List<HRegion> regions = Lists.newArrayListWithCapacity(regionSpecifiers.size()); 1453 for (RegionSpecifier regionSpecifier: regionSpecifiers) { 1454 try { 1455 regions.add(regionServer.getRegion(regionSpecifier.getValue().toByteArray())); 1456 } catch (NotServingRegionException e) { 1457 stats.addException(regionSpecifier.getValue().toByteArray(), e); 1458 } 1459 } 1460 return regions; 1461 } 1462 1463 @VisibleForTesting 1464 public PriorityFunction getPriority() { 1465 return priority; 1466 } 1467 1468 @VisibleForTesting 1469 public Configuration getConfiguration() { 1470 return regionServer.getConfiguration(); 1471 } 1472 1473 private RegionServerRpcQuotaManager getRpcQuotaManager() { 1474 return regionServer.getRegionServerRpcQuotaManager(); 1475 } 1476 1477 private RegionServerSpaceQuotaManager getSpaceQuotaManager() { 1478 return regionServer.getRegionServerSpaceQuotaManager(); 1479 } 1480 1481 void start(ZKWatcher zkWatcher) { 1482 if (AccessChecker.isAuthorizationSupported(getConfiguration())) { 1483 accessChecker = new AccessChecker(getConfiguration(), zkWatcher); 1484 } 1485 this.scannerIdGenerator = new ScannerIdGenerator(this.regionServer.serverName); 1486 rpcServer.start(); 1487 } 1488 1489 void stop() { 1490 if (accessChecker != null) { 1491 accessChecker.stop(); 1492 } 1493 closeAllScanners(); 1494 rpcServer.stop(); 1495 } 1496 1497 /** 1498 * Called to verify that this server is up and running. 1499 */ 1500 // TODO : Rename this and HMaster#checkInitialized to isRunning() (or a better name). 1501 protected void checkOpen() throws IOException { 1502 if (regionServer.isAborted()) { 1503 throw new RegionServerAbortedException("Server " + regionServer.serverName + " aborting"); 1504 } 1505 if (regionServer.isStopped()) { 1506 throw new RegionServerStoppedException("Server " + regionServer.serverName + " stopping"); 1507 } 1508 if (!regionServer.fsOk) { 1509 throw new RegionServerStoppedException("File system not available"); 1510 } 1511 if (!regionServer.isOnline()) { 1512 throw new ServerNotRunningYetException("Server " + regionServer.serverName 1513 + " is not running yet"); 1514 } 1515 } 1516 1517 /** 1518 * By default, put up an Admin and a Client Service. 1519 * Set booleans <code>hbase.regionserver.admin.executorService</code> and 1520 * <code>hbase.regionserver.client.executorService</code> if you want to enable/disable services. 1521 * Default is that both are enabled. 1522 * @return immutable list of blocking services and the security info classes that this server 1523 * supports 1524 */ 1525 protected List<BlockingServiceAndInterface> getServices() { 1526 boolean admin = 1527 getConfiguration().getBoolean(REGIONSERVER_ADMIN_SERVICE_CONFIG, true); 1528 boolean client = 1529 getConfiguration().getBoolean(REGIONSERVER_CLIENT_SERVICE_CONFIG, true); 1530 List<BlockingServiceAndInterface> bssi = new ArrayList<>(); 1531 if (client) { 1532 bssi.add(new BlockingServiceAndInterface( 1533 ClientService.newReflectiveBlockingService(this), 1534 ClientService.BlockingInterface.class)); 1535 } 1536 if (admin) { 1537 bssi.add(new BlockingServiceAndInterface( 1538 AdminService.newReflectiveBlockingService(this), 1539 AdminService.BlockingInterface.class)); 1540 } 1541 return new org.apache.hbase.thirdparty.com.google.common.collect. 1542 ImmutableList.Builder<BlockingServiceAndInterface>().addAll(bssi).build(); 1543 } 1544 1545 public InetSocketAddress getSocketAddress() { 1546 return isa; 1547 } 1548 1549 @Override 1550 public int getPriority(RequestHeader header, Message param, User user) { 1551 return priority.getPriority(header, param, user); 1552 } 1553 1554 @Override 1555 public long getDeadline(RequestHeader header, Message param) { 1556 return priority.getDeadline(header, param); 1557 } 1558 1559 /* 1560 * Check if an OOME and, if so, abort immediately to avoid creating more objects. 1561 * 1562 * @param e 1563 * 1564 * @return True if we OOME'd and are aborting. 1565 */ 1566 @Override 1567 public boolean checkOOME(final Throwable e) { 1568 return exitIfOOME(e); 1569 } 1570 1571 public static boolean exitIfOOME(final Throwable e ){ 1572 boolean stop = false; 1573 try { 1574 if (e instanceof OutOfMemoryError 1575 || (e.getCause() != null && e.getCause() instanceof OutOfMemoryError) 1576 || (e.getMessage() != null && e.getMessage().contains( 1577 "java.lang.OutOfMemoryError"))) { 1578 stop = true; 1579 LOG.error(HBaseMarkers.FATAL, "Run out of memory; " 1580 + RSRpcServices.class.getSimpleName() + " will abort itself immediately", 1581 e); 1582 } 1583 } finally { 1584 if (stop) { 1585 Runtime.getRuntime().halt(1); 1586 } 1587 } 1588 return stop; 1589 } 1590 1591 /** 1592 * Close a region on the region server. 1593 * 1594 * @param controller the RPC controller 1595 * @param request the request 1596 * @throws ServiceException 1597 */ 1598 @Override 1599 @QosPriority(priority=HConstants.ADMIN_QOS) 1600 public CloseRegionResponse closeRegion(final RpcController controller, 1601 final CloseRegionRequest request) throws ServiceException { 1602 final ServerName sn = (request.hasDestinationServer() ? 1603 ProtobufUtil.toServerName(request.getDestinationServer()) : null); 1604 1605 try { 1606 checkOpen(); 1607 throwOnWrongStartCode(request); 1608 final String encodedRegionName = ProtobufUtil.getRegionEncodedName(request.getRegion()); 1609 1610 requestCount.increment(); 1611 if (sn == null) { 1612 LOG.info("Close " + encodedRegionName + " without moving"); 1613 } else { 1614 LOG.info("Close " + encodedRegionName + ", moving to " + sn); 1615 } 1616 boolean closed = regionServer.closeRegion(encodedRegionName, false, sn); 1617 CloseRegionResponse.Builder builder = CloseRegionResponse.newBuilder().setClosed(closed); 1618 return builder.build(); 1619 } catch (IOException ie) { 1620 throw new ServiceException(ie); 1621 } 1622 } 1623 1624 /** 1625 * Compact a region on the region server. 1626 * 1627 * @param controller the RPC controller 1628 * @param request the request 1629 * @throws ServiceException 1630 */ 1631 @Override 1632 @QosPriority(priority = HConstants.ADMIN_QOS) 1633 public CompactRegionResponse compactRegion(final RpcController controller, 1634 final CompactRegionRequest request) throws ServiceException { 1635 try { 1636 checkOpen(); 1637 requestCount.increment(); 1638 HRegion region = getRegion(request.getRegion()); 1639 // Quota support is enabled, the requesting user is not system/super user 1640 // and a quota policy is enforced that disables compactions. 1641 if (QuotaUtil.isQuotaEnabled(getConfiguration()) && 1642 !Superusers.isSuperUser(RpcServer.getRequestUser().orElse(null)) && 1643 this.regionServer.getRegionServerSpaceQuotaManager() 1644 .areCompactionsDisabled(region.getTableDescriptor().getTableName())) { 1645 throw new DoNotRetryIOException( 1646 "Compactions on this region are " + "disabled due to a space quota violation."); 1647 } 1648 region.startRegionOperation(Operation.COMPACT_REGION); 1649 LOG.info("Compacting " + region.getRegionInfo().getRegionNameAsString()); 1650 boolean major = request.hasMajor() && request.getMajor(); 1651 if (request.hasFamily()) { 1652 byte[] family = request.getFamily().toByteArray(); 1653 String log = "User-triggered " + (major ? "major " : "") + "compaction for region " + 1654 region.getRegionInfo().getRegionNameAsString() + " and family " + 1655 Bytes.toString(family); 1656 LOG.trace(log); 1657 region.requestCompaction(family, log, Store.PRIORITY_USER, major, 1658 CompactionLifeCycleTracker.DUMMY); 1659 } else { 1660 String log = "User-triggered " + (major ? "major " : "") + "compaction for region " + 1661 region.getRegionInfo().getRegionNameAsString(); 1662 LOG.trace(log); 1663 region.requestCompaction(log, Store.PRIORITY_USER, major, CompactionLifeCycleTracker.DUMMY); 1664 } 1665 return CompactRegionResponse.newBuilder().build(); 1666 } catch (IOException ie) { 1667 throw new ServiceException(ie); 1668 } 1669 } 1670 1671 @Override 1672 public CompactionSwitchResponse compactionSwitch(RpcController controller, 1673 CompactionSwitchRequest request) throws ServiceException { 1674 try { 1675 checkOpen(); 1676 requestCount.increment(); 1677 boolean prevState = regionServer.compactSplitThread.isCompactionsEnabled(); 1678 CompactionSwitchResponse response = 1679 CompactionSwitchResponse.newBuilder().setPrevState(prevState).build(); 1680 if (prevState == request.getEnabled()) { 1681 // passed in requested state is same as current state. No action required 1682 return response; 1683 } 1684 regionServer.compactSplitThread.switchCompaction(request.getEnabled()); 1685 return response; 1686 } catch (IOException ie) { 1687 throw new ServiceException(ie); 1688 } 1689 } 1690 1691 /** 1692 * Flush a region on the region server. 1693 * 1694 * @param controller the RPC controller 1695 * @param request the request 1696 * @throws ServiceException 1697 */ 1698 @Override 1699 @QosPriority(priority=HConstants.ADMIN_QOS) 1700 public FlushRegionResponse flushRegion(final RpcController controller, 1701 final FlushRegionRequest request) throws ServiceException { 1702 try { 1703 checkOpen(); 1704 requestCount.increment(); 1705 HRegion region = getRegion(request.getRegion()); 1706 LOG.info("Flushing " + region.getRegionInfo().getRegionNameAsString()); 1707 boolean shouldFlush = true; 1708 if (request.hasIfOlderThanTs()) { 1709 shouldFlush = region.getEarliestFlushTimeForAllStores() < request.getIfOlderThanTs(); 1710 } 1711 FlushRegionResponse.Builder builder = FlushRegionResponse.newBuilder(); 1712 if (shouldFlush) { 1713 boolean writeFlushWalMarker = request.hasWriteFlushWalMarker() ? 1714 request.getWriteFlushWalMarker() : false; 1715 // Go behind the curtain so we can manage writing of the flush WAL marker 1716 HRegion.FlushResultImpl flushResult = 1717 region.flushcache(true, writeFlushWalMarker, FlushLifeCycleTracker.DUMMY); 1718 boolean compactionNeeded = flushResult.isCompactionNeeded(); 1719 if (compactionNeeded) { 1720 regionServer.compactSplitThread.requestSystemCompaction(region, 1721 "Compaction through user triggered flush"); 1722 } 1723 builder.setFlushed(flushResult.isFlushSucceeded()); 1724 builder.setWroteFlushWalMarker(flushResult.wroteFlushWalMarker); 1725 } 1726 builder.setLastFlushTime(region.getEarliestFlushTimeForAllStores()); 1727 return builder.build(); 1728 } catch (DroppedSnapshotException ex) { 1729 // Cache flush can fail in a few places. If it fails in a critical 1730 // section, we get a DroppedSnapshotException and a replay of wal 1731 // is required. Currently the only way to do this is a restart of 1732 // the server. 1733 regionServer.abort("Replay of WAL required. Forcing server shutdown", ex); 1734 throw new ServiceException(ex); 1735 } catch (IOException ie) { 1736 throw new ServiceException(ie); 1737 } 1738 } 1739 1740 @Override 1741 @QosPriority(priority=HConstants.ADMIN_QOS) 1742 public GetOnlineRegionResponse getOnlineRegion(final RpcController controller, 1743 final GetOnlineRegionRequest request) throws ServiceException { 1744 try { 1745 checkOpen(); 1746 requestCount.increment(); 1747 Map<String, HRegion> onlineRegions = regionServer.onlineRegions; 1748 List<RegionInfo> list = new ArrayList<>(onlineRegions.size()); 1749 for (HRegion region: onlineRegions.values()) { 1750 list.add(region.getRegionInfo()); 1751 } 1752 Collections.sort(list, RegionInfo.COMPARATOR); 1753 return ResponseConverter.buildGetOnlineRegionResponse(list); 1754 } catch (IOException ie) { 1755 throw new ServiceException(ie); 1756 } 1757 } 1758 1759 // Master implementation of this Admin Service differs given it is not 1760 // able to supply detail only known to RegionServer. See note on 1761 // MasterRpcServers#getRegionInfo. 1762 @Override 1763 @QosPriority(priority=HConstants.ADMIN_QOS) 1764 public GetRegionInfoResponse getRegionInfo(final RpcController controller, 1765 final GetRegionInfoRequest request) throws ServiceException { 1766 try { 1767 checkOpen(); 1768 requestCount.increment(); 1769 HRegion region = getRegion(request.getRegion()); 1770 RegionInfo info = region.getRegionInfo(); 1771 byte[] bestSplitRow = null; 1772 boolean shouldSplit = true; 1773 if (request.hasBestSplitRow() && request.getBestSplitRow()) { 1774 HRegion r = region; 1775 region.startRegionOperation(Operation.SPLIT_REGION); 1776 r.forceSplit(null); 1777 // Even after setting force split if split policy says no to split then we should not split. 1778 shouldSplit = region.getSplitPolicy().shouldSplit() && !info.isMetaRegion(); 1779 bestSplitRow = r.checkSplit(); 1780 // when all table data are in memstore, bestSplitRow = null 1781 // try to flush region first 1782 if(bestSplitRow == null) { 1783 r.flush(true); 1784 bestSplitRow = r.checkSplit(); 1785 } 1786 r.clearSplit(); 1787 } 1788 GetRegionInfoResponse.Builder builder = GetRegionInfoResponse.newBuilder(); 1789 builder.setRegionInfo(ProtobufUtil.toRegionInfo(info)); 1790 if (request.hasCompactionState() && request.getCompactionState()) { 1791 builder.setCompactionState(ProtobufUtil.createCompactionState(region.getCompactionState())); 1792 } 1793 builder.setSplittable(region.isSplittable() && shouldSplit); 1794 builder.setMergeable(region.isMergeable()); 1795 if (request.hasBestSplitRow() && request.getBestSplitRow() && bestSplitRow != null) { 1796 builder.setBestSplitRow(UnsafeByteOperations.unsafeWrap(bestSplitRow)); 1797 } 1798 return builder.build(); 1799 } catch (IOException ie) { 1800 throw new ServiceException(ie); 1801 } 1802 } 1803 1804 @Override 1805 @QosPriority(priority=HConstants.ADMIN_QOS) 1806 public GetRegionLoadResponse getRegionLoad(RpcController controller, 1807 GetRegionLoadRequest request) throws ServiceException { 1808 1809 List<HRegion> regions; 1810 if (request.hasTableName()) { 1811 TableName tableName = ProtobufUtil.toTableName(request.getTableName()); 1812 regions = regionServer.getRegions(tableName); 1813 } else { 1814 regions = regionServer.getRegions(); 1815 } 1816 List<RegionLoad> rLoads = new ArrayList<>(regions.size()); 1817 RegionLoad.Builder regionLoadBuilder = ClusterStatusProtos.RegionLoad.newBuilder(); 1818 RegionSpecifier.Builder regionSpecifier = RegionSpecifier.newBuilder(); 1819 1820 try { 1821 for (HRegion region : regions) { 1822 rLoads.add(regionServer.createRegionLoad(region, regionLoadBuilder, regionSpecifier)); 1823 } 1824 } catch (IOException e) { 1825 throw new ServiceException(e); 1826 } 1827 GetRegionLoadResponse.Builder builder = GetRegionLoadResponse.newBuilder(); 1828 builder.addAllRegionLoads(rLoads); 1829 return builder.build(); 1830 } 1831 1832 @Override 1833 @QosPriority(priority=HConstants.ADMIN_QOS) 1834 public ClearCompactionQueuesResponse clearCompactionQueues(RpcController controller, 1835 ClearCompactionQueuesRequest request) throws ServiceException { 1836 LOG.debug("Client=" + RpcServer.getRequestUserName().orElse(null) + "/" 1837 + RpcServer.getRemoteAddress().orElse(null) + " clear compactions queue"); 1838 ClearCompactionQueuesResponse.Builder respBuilder = ClearCompactionQueuesResponse.newBuilder(); 1839 requestCount.increment(); 1840 if (clearCompactionQueues.compareAndSet(false,true)) { 1841 try { 1842 checkOpen(); 1843 regionServer.getRegionServerCoprocessorHost().preClearCompactionQueues(); 1844 for (String queueName : request.getQueueNameList()) { 1845 LOG.debug("clear " + queueName + " compaction queue"); 1846 switch (queueName) { 1847 case "long": 1848 regionServer.compactSplitThread.clearLongCompactionsQueue(); 1849 break; 1850 case "short": 1851 regionServer.compactSplitThread.clearShortCompactionsQueue(); 1852 break; 1853 default: 1854 LOG.warn("Unknown queue name " + queueName); 1855 throw new IOException("Unknown queue name " + queueName); 1856 } 1857 } 1858 regionServer.getRegionServerCoprocessorHost().postClearCompactionQueues(); 1859 } catch (IOException ie) { 1860 throw new ServiceException(ie); 1861 } finally { 1862 clearCompactionQueues.set(false); 1863 } 1864 } else { 1865 LOG.warn("Clear compactions queue is executing by other admin."); 1866 } 1867 return respBuilder.build(); 1868 } 1869 1870 /** 1871 * Get some information of the region server. 1872 * 1873 * @param controller the RPC controller 1874 * @param request the request 1875 * @throws ServiceException 1876 */ 1877 @Override 1878 @QosPriority(priority=HConstants.ADMIN_QOS) 1879 public GetServerInfoResponse getServerInfo(final RpcController controller, 1880 final GetServerInfoRequest request) throws ServiceException { 1881 try { 1882 checkOpen(); 1883 } catch (IOException ie) { 1884 throw new ServiceException(ie); 1885 } 1886 requestCount.increment(); 1887 int infoPort = regionServer.infoServer != null ? regionServer.infoServer.getPort() : -1; 1888 return ResponseConverter.buildGetServerInfoResponse(regionServer.serverName, infoPort); 1889 } 1890 1891 @Override 1892 @QosPriority(priority=HConstants.ADMIN_QOS) 1893 public GetStoreFileResponse getStoreFile(final RpcController controller, 1894 final GetStoreFileRequest request) throws ServiceException { 1895 try { 1896 checkOpen(); 1897 HRegion region = getRegion(request.getRegion()); 1898 requestCount.increment(); 1899 Set<byte[]> columnFamilies; 1900 if (request.getFamilyCount() == 0) { 1901 columnFamilies = region.getTableDescriptor().getColumnFamilyNames(); 1902 } else { 1903 columnFamilies = new TreeSet<>(Bytes.BYTES_RAWCOMPARATOR); 1904 for (ByteString cf: request.getFamilyList()) { 1905 columnFamilies.add(cf.toByteArray()); 1906 } 1907 } 1908 int nCF = columnFamilies.size(); 1909 List<String> fileList = region.getStoreFileList( 1910 columnFamilies.toArray(new byte[nCF][])); 1911 GetStoreFileResponse.Builder builder = GetStoreFileResponse.newBuilder(); 1912 builder.addAllStoreFile(fileList); 1913 return builder.build(); 1914 } catch (IOException ie) { 1915 throw new ServiceException(ie); 1916 } 1917 } 1918 1919 private void throwOnWrongStartCode(OpenRegionRequest request) throws ServiceException { 1920 if (!request.hasServerStartCode()) { 1921 LOG.warn("OpenRegionRequest for {} does not have a start code", request.getOpenInfoList()); 1922 return; 1923 } 1924 throwOnWrongStartCode(request.getServerStartCode()); 1925 } 1926 1927 private void throwOnWrongStartCode(CloseRegionRequest request) throws ServiceException { 1928 if (!request.hasServerStartCode()) { 1929 LOG.warn("CloseRegionRequest for {} does not have a start code", request.getRegion()); 1930 return; 1931 } 1932 throwOnWrongStartCode(request.getServerStartCode()); 1933 } 1934 1935 private void throwOnWrongStartCode(long serverStartCode) throws ServiceException { 1936 // check that we are the same server that this RPC is intended for. 1937 if (regionServer.serverName.getStartcode() != serverStartCode) { 1938 throw new ServiceException(new DoNotRetryIOException( 1939 "This RPC was intended for a " + "different server with startCode: " + serverStartCode + 1940 ", this server is: " + regionServer.serverName)); 1941 } 1942 } 1943 1944 private void throwOnWrongStartCode(ExecuteProceduresRequest req) throws ServiceException { 1945 if (req.getOpenRegionCount() > 0) { 1946 for (OpenRegionRequest openReq : req.getOpenRegionList()) { 1947 throwOnWrongStartCode(openReq); 1948 } 1949 } 1950 if (req.getCloseRegionCount() > 0) { 1951 for (CloseRegionRequest closeReq : req.getCloseRegionList()) { 1952 throwOnWrongStartCode(closeReq); 1953 } 1954 } 1955 } 1956 1957 /** 1958 * Open asynchronously a region or a set of regions on the region server. 1959 * 1960 * The opening is coordinated by ZooKeeper, and this method requires the znode to be created 1961 * before being called. As a consequence, this method should be called only from the master. 1962 * <p> 1963 * Different manages states for the region are: 1964 * </p><ul> 1965 * <li>region not opened: the region opening will start asynchronously.</li> 1966 * <li>a close is already in progress: this is considered as an error.</li> 1967 * <li>an open is already in progress: this new open request will be ignored. This is important 1968 * because the Master can do multiple requests if it crashes.</li> 1969 * <li>the region is already opened: this new open request will be ignored.</li> 1970 * </ul> 1971 * <p> 1972 * Bulk assign: If there are more than 1 region to open, it will be considered as a bulk assign. 1973 * For a single region opening, errors are sent through a ServiceException. For bulk assign, 1974 * errors are put in the response as FAILED_OPENING. 1975 * </p> 1976 * @param controller the RPC controller 1977 * @param request the request 1978 * @throws ServiceException 1979 */ 1980 @Override 1981 @QosPriority(priority=HConstants.ADMIN_QOS) 1982 public OpenRegionResponse openRegion(final RpcController controller, 1983 final OpenRegionRequest request) throws ServiceException { 1984 requestCount.increment(); 1985 throwOnWrongStartCode(request); 1986 1987 OpenRegionResponse.Builder builder = OpenRegionResponse.newBuilder(); 1988 final int regionCount = request.getOpenInfoCount(); 1989 final Map<TableName, TableDescriptor> htds = new HashMap<>(regionCount); 1990 final boolean isBulkAssign = regionCount > 1; 1991 try { 1992 checkOpen(); 1993 } catch (IOException ie) { 1994 TableName tableName = null; 1995 if (regionCount == 1) { 1996 org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionInfo ri = request.getOpenInfo(0).getRegion(); 1997 if (ri != null) { 1998 tableName = ProtobufUtil.toTableName(ri.getTableName()); 1999 } 2000 } 2001 if (!TableName.META_TABLE_NAME.equals(tableName)) { 2002 throw new ServiceException(ie); 2003 } 2004 // We are assigning meta, wait a little for regionserver to finish initialization. 2005 int timeout = regionServer.conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY, 2006 HConstants.DEFAULT_HBASE_RPC_TIMEOUT) >> 2; // Quarter of RPC timeout 2007 long endTime = System.currentTimeMillis() + timeout; 2008 synchronized (regionServer.online) { 2009 try { 2010 while (System.currentTimeMillis() <= endTime 2011 && !regionServer.isStopped() && !regionServer.isOnline()) { 2012 regionServer.online.wait(regionServer.msgInterval); 2013 } 2014 checkOpen(); 2015 } catch (InterruptedException t) { 2016 Thread.currentThread().interrupt(); 2017 throw new ServiceException(t); 2018 } catch (IOException e) { 2019 throw new ServiceException(e); 2020 } 2021 } 2022 } 2023 2024 long masterSystemTime = request.hasMasterSystemTime() ? request.getMasterSystemTime() : -1; 2025 2026 for (RegionOpenInfo regionOpenInfo : request.getOpenInfoList()) { 2027 final RegionInfo region = ProtobufUtil.toRegionInfo(regionOpenInfo.getRegion()); 2028 TableDescriptor htd; 2029 try { 2030 String encodedName = region.getEncodedName(); 2031 byte[] encodedNameBytes = region.getEncodedNameAsBytes(); 2032 final HRegion onlineRegion = regionServer.getRegion(encodedName); 2033 if (onlineRegion != null) { 2034 // The region is already online. This should not happen any more. 2035 String error = "Received OPEN for the region:" 2036 + region.getRegionNameAsString() + ", which is already online"; 2037 LOG.warn(error); 2038 //regionServer.abort(error); 2039 //throw new IOException(error); 2040 builder.addOpeningState(RegionOpeningState.OPENED); 2041 continue; 2042 } 2043 LOG.info("Open " + region.getRegionNameAsString()); 2044 2045 final Boolean previous = regionServer.regionsInTransitionInRS.putIfAbsent( 2046 encodedNameBytes, Boolean.TRUE); 2047 2048 if (Boolean.FALSE.equals(previous)) { 2049 if (regionServer.getRegion(encodedName) != null) { 2050 // There is a close in progress. This should not happen any more. 2051 String error = "Received OPEN for the region:" 2052 + region.getRegionNameAsString() + ", which we are already trying to CLOSE"; 2053 regionServer.abort(error); 2054 throw new IOException(error); 2055 } 2056 regionServer.regionsInTransitionInRS.put(encodedNameBytes, Boolean.TRUE); 2057 } 2058 2059 if (Boolean.TRUE.equals(previous)) { 2060 // An open is in progress. This is supported, but let's log this. 2061 LOG.info("Receiving OPEN for the region:" + 2062 region.getRegionNameAsString() + ", which we are already trying to OPEN" 2063 + " - ignoring this new request for this region."); 2064 } 2065 2066 // We are opening this region. If it moves back and forth for whatever reason, we don't 2067 // want to keep returning the stale moved record while we are opening/if we close again. 2068 regionServer.removeFromMovedRegions(region.getEncodedName()); 2069 2070 if (previous == null || !previous.booleanValue()) { 2071 htd = htds.get(region.getTable()); 2072 if (htd == null) { 2073 htd = regionServer.tableDescriptors.get(region.getTable()); 2074 htds.put(region.getTable(), htd); 2075 } 2076 if (htd == null) { 2077 throw new IOException("Missing table descriptor for " + region.getEncodedName()); 2078 } 2079 // If there is no action in progress, we can submit a specific handler. 2080 // Need to pass the expected version in the constructor. 2081 if (regionServer.executorService == null) { 2082 LOG.info("No executor executorService; skipping open request"); 2083 } else { 2084 if (region.isMetaRegion()) { 2085 regionServer.executorService.submit(new OpenMetaHandler( 2086 regionServer, regionServer, region, htd, masterSystemTime)); 2087 } else { 2088 if (regionOpenInfo.getFavoredNodesCount() > 0) { 2089 regionServer.updateRegionFavoredNodesMapping(region.getEncodedName(), 2090 regionOpenInfo.getFavoredNodesList()); 2091 } 2092 if (htd.getPriority() >= HConstants.ADMIN_QOS || region.getTable().isSystemTable()) { 2093 regionServer.executorService.submit(new OpenPriorityRegionHandler( 2094 regionServer, regionServer, region, htd, masterSystemTime)); 2095 } else { 2096 regionServer.executorService.submit(new OpenRegionHandler( 2097 regionServer, regionServer, region, htd, masterSystemTime)); 2098 } 2099 } 2100 } 2101 } 2102 2103 builder.addOpeningState(RegionOpeningState.OPENED); 2104 } catch (IOException ie) { 2105 LOG.warn("Failed opening region " + region.getRegionNameAsString(), ie); 2106 if (isBulkAssign) { 2107 builder.addOpeningState(RegionOpeningState.FAILED_OPENING); 2108 } else { 2109 throw new ServiceException(ie); 2110 } 2111 } 2112 } 2113 return builder.build(); 2114 } 2115 2116 /** 2117 * Wamrmup a region on this server. 2118 * 2119 * This method should only be called by Master. It synchrnously opens the region and 2120 * closes the region bringing the most important pages in cache. 2121 * <p> 2122 * 2123 * @param controller the RPC controller 2124 * @param request the request 2125 * @throws ServiceException 2126 */ 2127 @Override 2128 public WarmupRegionResponse warmupRegion(final RpcController controller, 2129 final WarmupRegionRequest request) throws ServiceException { 2130 2131 final RegionInfo region = ProtobufUtil.toRegionInfo(request.getRegionInfo()); 2132 TableDescriptor htd; 2133 WarmupRegionResponse response = WarmupRegionResponse.getDefaultInstance(); 2134 2135 try { 2136 checkOpen(); 2137 String encodedName = region.getEncodedName(); 2138 byte[] encodedNameBytes = region.getEncodedNameAsBytes(); 2139 final HRegion onlineRegion = regionServer.getRegion(encodedName); 2140 2141 if (onlineRegion != null) { 2142 LOG.info("Region already online. Skipping warming up " + region); 2143 return response; 2144 } 2145 2146 htd = regionServer.tableDescriptors.get(region.getTable()); 2147 2148 if (regionServer.getRegionsInTransitionInRS().containsKey(encodedNameBytes)) { 2149 LOG.info("Region is in transition. Skipping warmup " + region); 2150 return response; 2151 } 2152 2153 LOG.info("Warming up region " + region.getRegionNameAsString()); 2154 HRegion.warmupHRegion(region, htd, regionServer.getWAL(region), 2155 regionServer.getConfiguration(), regionServer, null); 2156 2157 } catch (IOException ie) { 2158 LOG.error("Failed warming up region " + region.getRegionNameAsString(), ie); 2159 throw new ServiceException(ie); 2160 } 2161 2162 return response; 2163 } 2164 2165 /** 2166 * Replay the given changes when distributedLogReplay WAL edits from a failed RS. The guarantee is 2167 * that the given mutations will be durable on the receiving RS if this method returns without any 2168 * exception. 2169 * @param controller the RPC controller 2170 * @param request the request 2171 * @throws ServiceException 2172 */ 2173 @Override 2174 @QosPriority(priority = HConstants.REPLAY_QOS) 2175 public ReplicateWALEntryResponse replay(final RpcController controller, 2176 final ReplicateWALEntryRequest request) throws ServiceException { 2177 long before = EnvironmentEdgeManager.currentTime(); 2178 CellScanner cells = ((HBaseRpcController) controller).cellScanner(); 2179 try { 2180 checkOpen(); 2181 List<WALEntry> entries = request.getEntryList(); 2182 if (entries == null || entries.isEmpty()) { 2183 // empty input 2184 return ReplicateWALEntryResponse.newBuilder().build(); 2185 } 2186 ByteString regionName = entries.get(0).getKey().getEncodedRegionName(); 2187 HRegion region = regionServer.getRegionByEncodedName(regionName.toStringUtf8()); 2188 RegionCoprocessorHost coprocessorHost = 2189 ServerRegionReplicaUtil.isDefaultReplica(region.getRegionInfo()) 2190 ? region.getCoprocessorHost() 2191 : null; // do not invoke coprocessors if this is a secondary region replica 2192 List<Pair<WALKey, WALEdit>> walEntries = new ArrayList<>(); 2193 2194 // Skip adding the edits to WAL if this is a secondary region replica 2195 boolean isPrimary = RegionReplicaUtil.isDefaultReplica(region.getRegionInfo()); 2196 Durability durability = isPrimary ? Durability.USE_DEFAULT : Durability.SKIP_WAL; 2197 2198 for (WALEntry entry : entries) { 2199 if (!regionName.equals(entry.getKey().getEncodedRegionName())) { 2200 throw new NotServingRegionException("Replay request contains entries from multiple " + 2201 "regions. First region:" + regionName.toStringUtf8() + " , other region:" 2202 + entry.getKey().getEncodedRegionName()); 2203 } 2204 if (regionServer.nonceManager != null && isPrimary) { 2205 long nonceGroup = entry.getKey().hasNonceGroup() 2206 ? entry.getKey().getNonceGroup() : HConstants.NO_NONCE; 2207 long nonce = entry.getKey().hasNonce() ? entry.getKey().getNonce() : HConstants.NO_NONCE; 2208 regionServer.nonceManager.reportOperationFromWal( 2209 nonceGroup, 2210 nonce, 2211 entry.getKey().getWriteTime()); 2212 } 2213 Pair<WALKey, WALEdit> walEntry = (coprocessorHost == null) ? null : new Pair<>(); 2214 List<MutationReplay> edits = WALSplitUtil.getMutationsFromWALEntry(entry, 2215 cells, walEntry, durability); 2216 if (coprocessorHost != null) { 2217 // Start coprocessor replay here. The coprocessor is for each WALEdit instead of a 2218 // KeyValue. 2219 if (coprocessorHost.preWALRestore(region.getRegionInfo(), walEntry.getFirst(), 2220 walEntry.getSecond())) { 2221 // if bypass this log entry, ignore it ... 2222 continue; 2223 } 2224 walEntries.add(walEntry); 2225 } 2226 if(edits!=null && !edits.isEmpty()) { 2227 // HBASE-17924 2228 // sort to improve lock efficiency 2229 Collections.sort(edits, (v1, v2) -> Row.COMPARATOR.compare(v1.mutation, v2.mutation)); 2230 long replaySeqId = (entry.getKey().hasOrigSequenceNumber()) ? 2231 entry.getKey().getOrigSequenceNumber() : entry.getKey().getLogSequenceNumber(); 2232 OperationStatus[] result = doReplayBatchOp(region, edits, replaySeqId); 2233 // check if it's a partial success 2234 for (int i = 0; result != null && i < result.length; i++) { 2235 if (result[i] != OperationStatus.SUCCESS) { 2236 throw new IOException(result[i].getExceptionMsg()); 2237 } 2238 } 2239 } 2240 } 2241 2242 //sync wal at the end because ASYNC_WAL is used above 2243 WAL wal = region.getWAL(); 2244 if (wal != null) { 2245 wal.sync(); 2246 } 2247 2248 if (coprocessorHost != null) { 2249 for (Pair<WALKey, WALEdit> entry : walEntries) { 2250 coprocessorHost.postWALRestore(region.getRegionInfo(), entry.getFirst(), 2251 entry.getSecond()); 2252 } 2253 } 2254 return ReplicateWALEntryResponse.newBuilder().build(); 2255 } catch (IOException ie) { 2256 throw new ServiceException(ie); 2257 } finally { 2258 if (regionServer.metricsRegionServer != null) { 2259 regionServer.metricsRegionServer.updateReplay( 2260 EnvironmentEdgeManager.currentTime() - before); 2261 } 2262 } 2263 } 2264 2265 /** 2266 * Replicate WAL entries on the region server. 2267 * 2268 * @param controller the RPC controller 2269 * @param request the request 2270 * @throws ServiceException 2271 */ 2272 @Override 2273 @QosPriority(priority=HConstants.REPLICATION_QOS) 2274 public ReplicateWALEntryResponse replicateWALEntry(final RpcController controller, 2275 final ReplicateWALEntryRequest request) throws ServiceException { 2276 try { 2277 checkOpen(); 2278 if (regionServer.replicationSinkHandler != null) { 2279 requestCount.increment(); 2280 List<WALEntry> entries = request.getEntryList(); 2281 CellScanner cellScanner = ((HBaseRpcController)controller).cellScanner(); 2282 regionServer.getRegionServerCoprocessorHost().preReplicateLogEntries(); 2283 regionServer.replicationSinkHandler.replicateLogEntries(entries, cellScanner, 2284 request.getReplicationClusterId(), request.getSourceBaseNamespaceDirPath(), 2285 request.getSourceHFileArchiveDirPath()); 2286 regionServer.getRegionServerCoprocessorHost().postReplicateLogEntries(); 2287 return ReplicateWALEntryResponse.newBuilder().build(); 2288 } else { 2289 throw new ServiceException("Replication services are not initialized yet"); 2290 } 2291 } catch (IOException ie) { 2292 throw new ServiceException(ie); 2293 } 2294 } 2295 2296 /** 2297 * Roll the WAL writer of the region server. 2298 * @param controller the RPC controller 2299 * @param request the request 2300 * @throws ServiceException 2301 */ 2302 @Override 2303 public RollWALWriterResponse rollWALWriter(final RpcController controller, 2304 final RollWALWriterRequest request) throws ServiceException { 2305 try { 2306 checkOpen(); 2307 requestCount.increment(); 2308 regionServer.getRegionServerCoprocessorHost().preRollWALWriterRequest(); 2309 regionServer.walRoller.requestRollAll(); 2310 regionServer.getRegionServerCoprocessorHost().postRollWALWriterRequest(); 2311 RollWALWriterResponse.Builder builder = RollWALWriterResponse.newBuilder(); 2312 return builder.build(); 2313 } catch (IOException ie) { 2314 throw new ServiceException(ie); 2315 } 2316 } 2317 2318 2319 /** 2320 * Stop the region server. 2321 * 2322 * @param controller the RPC controller 2323 * @param request the request 2324 * @throws ServiceException 2325 */ 2326 @Override 2327 @QosPriority(priority=HConstants.ADMIN_QOS) 2328 public StopServerResponse stopServer(final RpcController controller, 2329 final StopServerRequest request) throws ServiceException { 2330 requestCount.increment(); 2331 String reason = request.getReason(); 2332 regionServer.stop(reason); 2333 return StopServerResponse.newBuilder().build(); 2334 } 2335 2336 @Override 2337 public UpdateFavoredNodesResponse updateFavoredNodes(RpcController controller, 2338 UpdateFavoredNodesRequest request) throws ServiceException { 2339 List<UpdateFavoredNodesRequest.RegionUpdateInfo> openInfoList = request.getUpdateInfoList(); 2340 UpdateFavoredNodesResponse.Builder respBuilder = UpdateFavoredNodesResponse.newBuilder(); 2341 for (UpdateFavoredNodesRequest.RegionUpdateInfo regionUpdateInfo : openInfoList) { 2342 RegionInfo hri = ProtobufUtil.toRegionInfo(regionUpdateInfo.getRegion()); 2343 if (regionUpdateInfo.getFavoredNodesCount() > 0) { 2344 regionServer.updateRegionFavoredNodesMapping(hri.getEncodedName(), 2345 regionUpdateInfo.getFavoredNodesList()); 2346 } 2347 } 2348 respBuilder.setResponse(openInfoList.size()); 2349 return respBuilder.build(); 2350 } 2351 2352 /** 2353 * Atomically bulk load several HFiles into an open region 2354 * @return true if successful, false is failed but recoverably (no action) 2355 * @throws ServiceException if failed unrecoverably 2356 */ 2357 @Override 2358 public BulkLoadHFileResponse bulkLoadHFile(final RpcController controller, 2359 final BulkLoadHFileRequest request) throws ServiceException { 2360 long start = EnvironmentEdgeManager.currentTime(); 2361 List<String> clusterIds = new ArrayList<>(request.getClusterIdsList()); 2362 if(clusterIds.contains(this.regionServer.clusterId)){ 2363 return BulkLoadHFileResponse.newBuilder().setLoaded(true).build(); 2364 } else { 2365 clusterIds.add(this.regionServer.clusterId); 2366 } 2367 try { 2368 checkOpen(); 2369 requestCount.increment(); 2370 HRegion region = getRegion(request.getRegion()); 2371 Map<byte[], List<Path>> map = null; 2372 final boolean spaceQuotaEnabled = QuotaUtil.isQuotaEnabled(getConfiguration()); 2373 long sizeToBeLoaded = -1; 2374 2375 // Check to see if this bulk load would exceed the space quota for this table 2376 if (spaceQuotaEnabled) { 2377 ActivePolicyEnforcement activeSpaceQuotas = getSpaceQuotaManager().getActiveEnforcements(); 2378 SpaceViolationPolicyEnforcement enforcement = activeSpaceQuotas.getPolicyEnforcement( 2379 region); 2380 if (enforcement != null) { 2381 // Bulk loads must still be atomic. We must enact all or none. 2382 List<String> filePaths = new ArrayList<>(request.getFamilyPathCount()); 2383 for (FamilyPath familyPath : request.getFamilyPathList()) { 2384 filePaths.add(familyPath.getPath()); 2385 } 2386 // Check if the batch of files exceeds the current quota 2387 sizeToBeLoaded = enforcement.computeBulkLoadSize(regionServer.getFileSystem(), filePaths); 2388 } 2389 } 2390 2391 List<Pair<byte[], String>> familyPaths = new ArrayList<>(request.getFamilyPathCount()); 2392 for (FamilyPath familyPath : request.getFamilyPathList()) { 2393 familyPaths.add(new Pair<>(familyPath.getFamily().toByteArray(), familyPath.getPath())); 2394 } 2395 if (!request.hasBulkToken()) { 2396 if (region.getCoprocessorHost() != null) { 2397 region.getCoprocessorHost().preBulkLoadHFile(familyPaths); 2398 } 2399 try { 2400 map = region.bulkLoadHFiles(familyPaths, request.getAssignSeqNum(), null, 2401 request.getCopyFile(), clusterIds, request.getReplicate()); 2402 } finally { 2403 if (region.getCoprocessorHost() != null) { 2404 region.getCoprocessorHost().postBulkLoadHFile(familyPaths, map); 2405 } 2406 } 2407 } else { 2408 // secure bulk load 2409 map = regionServer.secureBulkLoadManager.secureBulkLoadHFiles(region, request, clusterIds); 2410 } 2411 BulkLoadHFileResponse.Builder builder = BulkLoadHFileResponse.newBuilder(); 2412 builder.setLoaded(map != null); 2413 if (map != null) { 2414 // Treat any negative size as a flag to "ignore" updating the region size as that is 2415 // not possible to occur in real life (cannot bulk load a file with negative size) 2416 if (spaceQuotaEnabled && sizeToBeLoaded > 0) { 2417 if (LOG.isTraceEnabled()) { 2418 LOG.trace("Incrementing space use of " + region.getRegionInfo() + " by " 2419 + sizeToBeLoaded + " bytes"); 2420 } 2421 // Inform space quotas of the new files for this region 2422 getSpaceQuotaManager().getRegionSizeStore().incrementRegionSize( 2423 region.getRegionInfo(), sizeToBeLoaded); 2424 } 2425 } 2426 return builder.build(); 2427 } catch (IOException ie) { 2428 throw new ServiceException(ie); 2429 } finally { 2430 if (regionServer.metricsRegionServer != null) { 2431 regionServer.metricsRegionServer.updateBulkLoad( 2432 EnvironmentEdgeManager.currentTime() - start); 2433 } 2434 } 2435 } 2436 2437 @Override 2438 public PrepareBulkLoadResponse prepareBulkLoad(RpcController controller, 2439 PrepareBulkLoadRequest request) throws ServiceException { 2440 try { 2441 checkOpen(); 2442 requestCount.increment(); 2443 2444 HRegion region = getRegion(request.getRegion()); 2445 2446 String bulkToken = regionServer.secureBulkLoadManager.prepareBulkLoad(region, request); 2447 PrepareBulkLoadResponse.Builder builder = PrepareBulkLoadResponse.newBuilder(); 2448 builder.setBulkToken(bulkToken); 2449 return builder.build(); 2450 } catch (IOException ie) { 2451 throw new ServiceException(ie); 2452 } 2453 } 2454 2455 @Override 2456 public CleanupBulkLoadResponse cleanupBulkLoad(RpcController controller, 2457 CleanupBulkLoadRequest request) throws ServiceException { 2458 try { 2459 checkOpen(); 2460 requestCount.increment(); 2461 2462 HRegion region = getRegion(request.getRegion()); 2463 2464 regionServer.secureBulkLoadManager.cleanupBulkLoad(region, request); 2465 CleanupBulkLoadResponse response = CleanupBulkLoadResponse.newBuilder().build(); 2466 return response; 2467 } catch (IOException ie) { 2468 throw new ServiceException(ie); 2469 } 2470 } 2471 2472 @Override 2473 public CoprocessorServiceResponse execService(final RpcController controller, 2474 final CoprocessorServiceRequest request) throws ServiceException { 2475 try { 2476 checkOpen(); 2477 requestCount.increment(); 2478 HRegion region = getRegion(request.getRegion()); 2479 com.google.protobuf.Message result = execServiceOnRegion(region, request.getCall()); 2480 CoprocessorServiceResponse.Builder builder = CoprocessorServiceResponse.newBuilder(); 2481 builder.setRegion(RequestConverter.buildRegionSpecifier( 2482 RegionSpecifierType.REGION_NAME, region.getRegionInfo().getRegionName())); 2483 // TODO: COPIES!!!!!! 2484 builder.setValue(builder.getValueBuilder().setName(result.getClass().getName()). 2485 setValue(org.apache.hbase.thirdparty.com.google.protobuf.ByteString. 2486 copyFrom(result.toByteArray()))); 2487 return builder.build(); 2488 } catch (IOException ie) { 2489 throw new ServiceException(ie); 2490 } 2491 } 2492 2493 private com.google.protobuf.Message execServiceOnRegion(HRegion region, 2494 final ClientProtos.CoprocessorServiceCall serviceCall) throws IOException { 2495 // ignore the passed in controller (from the serialized call) 2496 ServerRpcController execController = new ServerRpcController(); 2497 return region.execService(execController, serviceCall); 2498 } 2499 2500 /** 2501 * Get data from a table. 2502 * 2503 * @param controller the RPC controller 2504 * @param request the get request 2505 * @throws ServiceException 2506 */ 2507 @Override 2508 public GetResponse get(final RpcController controller, 2509 final GetRequest request) throws ServiceException { 2510 long before = EnvironmentEdgeManager.currentTime(); 2511 OperationQuota quota = null; 2512 HRegion region = null; 2513 try { 2514 checkOpen(); 2515 requestCount.increment(); 2516 rpcGetRequestCount.increment(); 2517 region = getRegion(request.getRegion()); 2518 2519 GetResponse.Builder builder = GetResponse.newBuilder(); 2520 ClientProtos.Get get = request.getGet(); 2521 // An asynchbase client, https://github.com/OpenTSDB/asynchbase, starts by trying to do 2522 // a get closest before. Throwing the UnknownProtocolException signals it that it needs 2523 // to switch and do hbase2 protocol (HBase servers do not tell clients what versions 2524 // they are; its a problem for non-native clients like asynchbase. HBASE-20225. 2525 if (get.hasClosestRowBefore() && get.getClosestRowBefore()) { 2526 throw new UnknownProtocolException("Is this a pre-hbase-1.0.0 or asynchbase client? " + 2527 "Client is invoking getClosestRowBefore removed in hbase-2.0.0 replaced by " + 2528 "reverse Scan."); 2529 } 2530 Boolean existence = null; 2531 Result r = null; 2532 RpcCallContext context = RpcServer.getCurrentCall().orElse(null); 2533 quota = getRpcQuotaManager().checkQuota(region, OperationQuota.OperationType.GET); 2534 2535 Get clientGet = ProtobufUtil.toGet(get); 2536 if (get.getExistenceOnly() && region.getCoprocessorHost() != null) { 2537 existence = region.getCoprocessorHost().preExists(clientGet); 2538 } 2539 if (existence == null) { 2540 if (context != null) { 2541 r = get(clientGet, (region), null, context); 2542 } else { 2543 // for test purpose 2544 r = region.get(clientGet); 2545 } 2546 if (get.getExistenceOnly()) { 2547 boolean exists = r.getExists(); 2548 if (region.getCoprocessorHost() != null) { 2549 exists = region.getCoprocessorHost().postExists(clientGet, exists); 2550 } 2551 existence = exists; 2552 } 2553 } 2554 if (existence != null) { 2555 ClientProtos.Result pbr = 2556 ProtobufUtil.toResult(existence, region.getRegionInfo().getReplicaId() != 0); 2557 builder.setResult(pbr); 2558 } else if (r != null) { 2559 ClientProtos.Result pbr; 2560 if (isClientCellBlockSupport(context) && controller instanceof HBaseRpcController 2561 && VersionInfoUtil.hasMinimumVersion(context.getClientVersionInfo(), 1, 3)) { 2562 pbr = ProtobufUtil.toResultNoData(r); 2563 ((HBaseRpcController) controller).setCellScanner(CellUtil.createCellScanner(r 2564 .rawCells())); 2565 addSize(context, r, null); 2566 } else { 2567 pbr = ProtobufUtil.toResult(r); 2568 } 2569 builder.setResult(pbr); 2570 } 2571 //r.cells is null when an table.exists(get) call 2572 if (r != null && r.rawCells() != null) { 2573 quota.addGetResult(r); 2574 } 2575 return builder.build(); 2576 } catch (IOException ie) { 2577 throw new ServiceException(ie); 2578 } finally { 2579 MetricsRegionServer mrs = regionServer.metricsRegionServer; 2580 if (mrs != null) { 2581 TableDescriptor td = region != null? region.getTableDescriptor(): null; 2582 if (td != null) { 2583 mrs.updateGet(td.getTableName(), EnvironmentEdgeManager.currentTime() - before); 2584 } 2585 } 2586 if (quota != null) { 2587 quota.close(); 2588 } 2589 } 2590 } 2591 2592 private Result get(Get get, HRegion region, RegionScannersCloseCallBack closeCallBack, 2593 RpcCallContext context) throws IOException { 2594 region.prepareGet(get); 2595 boolean stale = region.getRegionInfo().getReplicaId() != 0; 2596 2597 // This method is almost the same as HRegion#get. 2598 List<Cell> results = new ArrayList<>(); 2599 long before = EnvironmentEdgeManager.currentTime(); 2600 // pre-get CP hook 2601 if (region.getCoprocessorHost() != null) { 2602 if (region.getCoprocessorHost().preGet(get, results)) { 2603 region.metricsUpdateForGet(results, before); 2604 return Result 2605 .create(results, get.isCheckExistenceOnly() ? !results.isEmpty() : null, stale); 2606 } 2607 } 2608 Scan scan = new Scan(get); 2609 if (scan.getLoadColumnFamiliesOnDemandValue() == null) { 2610 scan.setLoadColumnFamiliesOnDemand(region.isLoadingCfsOnDemandDefault()); 2611 } 2612 RegionScannerImpl scanner = null; 2613 try { 2614 scanner = region.getScanner(scan); 2615 scanner.next(results); 2616 } finally { 2617 if (scanner != null) { 2618 if (closeCallBack == null) { 2619 // If there is a context then the scanner can be added to the current 2620 // RpcCallContext. The rpc callback will take care of closing the 2621 // scanner, for eg in case 2622 // of get() 2623 context.setCallBack(scanner); 2624 } else { 2625 // The call is from multi() where the results from the get() are 2626 // aggregated and then send out to the 2627 // rpc. The rpccall back will close all such scanners created as part 2628 // of multi(). 2629 closeCallBack.addScanner(scanner); 2630 } 2631 } 2632 } 2633 2634 // post-get CP hook 2635 if (region.getCoprocessorHost() != null) { 2636 region.getCoprocessorHost().postGet(get, results); 2637 } 2638 region.metricsUpdateForGet(results, before); 2639 2640 return Result.create(results, get.isCheckExistenceOnly() ? !results.isEmpty() : null, stale); 2641 } 2642 2643 private void checkBatchSizeAndLogLargeSize(MultiRequest request) { 2644 int sum = 0; 2645 String firstRegionName = null; 2646 for (RegionAction regionAction : request.getRegionActionList()) { 2647 if (sum == 0) { 2648 firstRegionName = Bytes.toStringBinary(regionAction.getRegion().getValue().toByteArray()); 2649 } 2650 sum += regionAction.getActionCount(); 2651 } 2652 if (sum > rowSizeWarnThreshold) { 2653 ld.logBatchWarning(firstRegionName, sum, rowSizeWarnThreshold); 2654 } 2655 } 2656 2657 /** 2658 * Execute multiple actions on a table: get, mutate, and/or execCoprocessor 2659 * 2660 * @param rpcc the RPC controller 2661 * @param request the multi request 2662 * @throws ServiceException 2663 */ 2664 @Override 2665 public MultiResponse multi(final RpcController rpcc, final MultiRequest request) 2666 throws ServiceException { 2667 try { 2668 checkOpen(); 2669 } catch (IOException ie) { 2670 throw new ServiceException(ie); 2671 } 2672 2673 checkBatchSizeAndLogLargeSize(request); 2674 2675 // rpc controller is how we bring in data via the back door; it is unprotobuf'ed data. 2676 // It is also the conduit via which we pass back data. 2677 HBaseRpcController controller = (HBaseRpcController)rpcc; 2678 CellScanner cellScanner = controller != null ? controller.cellScanner(): null; 2679 if (controller != null) { 2680 controller.setCellScanner(null); 2681 } 2682 2683 long nonceGroup = request.hasNonceGroup() ? request.getNonceGroup() : HConstants.NO_NONCE; 2684 2685 // this will contain all the cells that we need to return. It's created later, if needed. 2686 List<CellScannable> cellsToReturn = null; 2687 MultiResponse.Builder responseBuilder = MultiResponse.newBuilder(); 2688 RegionActionResult.Builder regionActionResultBuilder = RegionActionResult.newBuilder(); 2689 Boolean processed = null; 2690 RegionScannersCloseCallBack closeCallBack = null; 2691 RpcCallContext context = RpcServer.getCurrentCall().orElse(null); 2692 this.rpcMultiRequestCount.increment(); 2693 this.requestCount.increment(); 2694 Map<RegionSpecifier, ClientProtos.RegionLoadStats> regionStats = new HashMap<>(request 2695 .getRegionActionCount()); 2696 ActivePolicyEnforcement spaceQuotaEnforcement = getSpaceQuotaManager().getActiveEnforcements(); 2697 for (RegionAction regionAction : request.getRegionActionList()) { 2698 OperationQuota quota; 2699 HRegion region; 2700 regionActionResultBuilder.clear(); 2701 RegionSpecifier regionSpecifier = regionAction.getRegion(); 2702 try { 2703 region = getRegion(regionSpecifier); 2704 quota = getRpcQuotaManager().checkQuota(region, regionAction.getActionList()); 2705 } catch (IOException e) { 2706 rpcServer.getMetrics().exception(e); 2707 regionActionResultBuilder.setException(ResponseConverter.buildException(e)); 2708 responseBuilder.addRegionActionResult(regionActionResultBuilder.build()); 2709 // All Mutations in this RegionAction not executed as we can not see the Region online here 2710 // in this RS. Will be retried from Client. Skipping all the Cells in CellScanner 2711 // corresponding to these Mutations. 2712 skipCellsForMutations(regionAction.getActionList(), cellScanner); 2713 continue; // For this region it's a failure. 2714 } 2715 2716 if (regionAction.hasAtomic() && regionAction.getAtomic()) { 2717 // How does this call happen? It may need some work to play well w/ the surroundings. 2718 // Need to return an item per Action along w/ Action index. TODO. 2719 try { 2720 if (request.hasCondition()) { 2721 Condition condition = request.getCondition(); 2722 byte[] row = condition.getRow().toByteArray(); 2723 byte[] family = condition.getFamily().toByteArray(); 2724 byte[] qualifier = condition.getQualifier().toByteArray(); 2725 CompareOperator op = 2726 CompareOperator.valueOf(condition.getCompareType().name()); 2727 ByteArrayComparable comparator = 2728 ProtobufUtil.toComparator(condition.getComparator()); 2729 TimeRange timeRange = condition.hasTimeRange() ? 2730 ProtobufUtil.toTimeRange(condition.getTimeRange()) : 2731 TimeRange.allTime(); 2732 processed = 2733 checkAndRowMutate(region, regionAction.getActionList(), cellScanner, row, family, 2734 qualifier, op, comparator, timeRange, regionActionResultBuilder, 2735 spaceQuotaEnforcement); 2736 } else { 2737 doAtomicBatchOp(regionActionResultBuilder, region, quota, regionAction.getActionList(), 2738 cellScanner, spaceQuotaEnforcement); 2739 processed = Boolean.TRUE; 2740 } 2741 } catch (IOException e) { 2742 rpcServer.getMetrics().exception(e); 2743 // As it's atomic, we may expect it's a global failure. 2744 regionActionResultBuilder.setException(ResponseConverter.buildException(e)); 2745 } 2746 } else { 2747 // doNonAtomicRegionMutation manages the exception internally 2748 if (context != null && closeCallBack == null) { 2749 // An RpcCallBack that creates a list of scanners that needs to perform callBack 2750 // operation on completion of multiGets. 2751 // Set this only once 2752 closeCallBack = new RegionScannersCloseCallBack(); 2753 context.setCallBack(closeCallBack); 2754 } 2755 cellsToReturn = doNonAtomicRegionMutation(region, quota, regionAction, cellScanner, 2756 regionActionResultBuilder, cellsToReturn, nonceGroup, closeCallBack, context, 2757 spaceQuotaEnforcement); 2758 } 2759 responseBuilder.addRegionActionResult(regionActionResultBuilder.build()); 2760 quota.close(); 2761 ClientProtos.RegionLoadStats regionLoadStats = region.getLoadStatistics(); 2762 if(regionLoadStats != null) { 2763 regionStats.put(regionSpecifier, regionLoadStats); 2764 } 2765 } 2766 // Load the controller with the Cells to return. 2767 if (cellsToReturn != null && !cellsToReturn.isEmpty() && controller != null) { 2768 controller.setCellScanner(CellUtil.createCellScanner(cellsToReturn)); 2769 } 2770 2771 if (processed != null) { 2772 responseBuilder.setProcessed(processed); 2773 } 2774 2775 MultiRegionLoadStats.Builder builder = MultiRegionLoadStats.newBuilder(); 2776 for(Entry<RegionSpecifier, ClientProtos.RegionLoadStats> stat: regionStats.entrySet()){ 2777 builder.addRegion(stat.getKey()); 2778 builder.addStat(stat.getValue()); 2779 } 2780 responseBuilder.setRegionStatistics(builder); 2781 return responseBuilder.build(); 2782 } 2783 2784 private void skipCellsForMutations(List<Action> actions, CellScanner cellScanner) { 2785 if (cellScanner == null) { 2786 return; 2787 } 2788 for (Action action : actions) { 2789 skipCellsForMutation(action, cellScanner); 2790 } 2791 } 2792 2793 private void skipCellsForMutation(Action action, CellScanner cellScanner) { 2794 if (cellScanner == null) { 2795 return; 2796 } 2797 try { 2798 if (action.hasMutation()) { 2799 MutationProto m = action.getMutation(); 2800 if (m.hasAssociatedCellCount()) { 2801 for (int i = 0; i < m.getAssociatedCellCount(); i++) { 2802 cellScanner.advance(); 2803 } 2804 } 2805 } 2806 } catch (IOException e) { 2807 // No need to handle these Individual Muatation level issue. Any way this entire RegionAction 2808 // marked as failed as we could not see the Region here. At client side the top level 2809 // RegionAction exception will be considered first. 2810 LOG.error("Error while skipping Cells in CellScanner for invalid Region Mutations", e); 2811 } 2812 } 2813 2814 /** 2815 * Mutate data in a table. 2816 * 2817 * @param rpcc the RPC controller 2818 * @param request the mutate request 2819 */ 2820 @Override 2821 public MutateResponse mutate(final RpcController rpcc, 2822 final MutateRequest request) throws ServiceException { 2823 // rpc controller is how we bring in data via the back door; it is unprotobuf'ed data. 2824 // It is also the conduit via which we pass back data. 2825 HBaseRpcController controller = (HBaseRpcController)rpcc; 2826 CellScanner cellScanner = controller != null ? controller.cellScanner() : null; 2827 OperationQuota quota = null; 2828 RpcCallContext context = RpcServer.getCurrentCall().orElse(null); 2829 ActivePolicyEnforcement spaceQuotaEnforcement = null; 2830 MutationType type = null; 2831 HRegion region = null; 2832 long before = EnvironmentEdgeManager.currentTime(); 2833 // Clear scanner so we are not holding on to reference across call. 2834 if (controller != null) { 2835 controller.setCellScanner(null); 2836 } 2837 try { 2838 checkOpen(); 2839 requestCount.increment(); 2840 rpcMutateRequestCount.increment(); 2841 region = getRegion(request.getRegion()); 2842 MutateResponse.Builder builder = MutateResponse.newBuilder(); 2843 MutationProto mutation = request.getMutation(); 2844 if (!region.getRegionInfo().isMetaRegion()) { 2845 regionServer.cacheFlusher.reclaimMemStoreMemory(); 2846 } 2847 long nonceGroup = request.hasNonceGroup() ? request.getNonceGroup() : HConstants.NO_NONCE; 2848 Result r = null; 2849 Boolean processed = null; 2850 type = mutation.getMutateType(); 2851 2852 quota = getRpcQuotaManager().checkQuota(region, OperationQuota.OperationType.MUTATE); 2853 spaceQuotaEnforcement = getSpaceQuotaManager().getActiveEnforcements(); 2854 2855 switch (type) { 2856 case APPEND: 2857 // TODO: this doesn't actually check anything. 2858 r = append(region, quota, mutation, cellScanner, nonceGroup, spaceQuotaEnforcement); 2859 break; 2860 case INCREMENT: 2861 // TODO: this doesn't actually check anything. 2862 r = increment(region, quota, mutation, cellScanner, nonceGroup, spaceQuotaEnforcement); 2863 break; 2864 case PUT: 2865 Put put = ProtobufUtil.toPut(mutation, cellScanner); 2866 checkCellSizeLimit(region, put); 2867 // Throws an exception when violated 2868 spaceQuotaEnforcement.getPolicyEnforcement(region).check(put); 2869 quota.addMutation(put); 2870 if (request.hasCondition()) { 2871 Condition condition = request.getCondition(); 2872 byte[] row = condition.getRow().toByteArray(); 2873 byte[] family = condition.getFamily().toByteArray(); 2874 byte[] qualifier = condition.getQualifier().toByteArray(); 2875 CompareOperator compareOp = 2876 CompareOperator.valueOf(condition.getCompareType().name()); 2877 ByteArrayComparable comparator = ProtobufUtil.toComparator(condition.getComparator()); 2878 TimeRange timeRange = condition.hasTimeRange() ? 2879 ProtobufUtil.toTimeRange(condition.getTimeRange()) : 2880 TimeRange.allTime(); 2881 if (region.getCoprocessorHost() != null) { 2882 processed = region.getCoprocessorHost().preCheckAndPut(row, family, qualifier, 2883 compareOp, comparator, put); 2884 } 2885 if (processed == null) { 2886 boolean result = region.checkAndMutate(row, family, 2887 qualifier, compareOp, comparator, timeRange, put); 2888 if (region.getCoprocessorHost() != null) { 2889 result = region.getCoprocessorHost().postCheckAndPut(row, family, 2890 qualifier, compareOp, comparator, put, result); 2891 } 2892 processed = result; 2893 } 2894 } else { 2895 region.put(put); 2896 processed = Boolean.TRUE; 2897 } 2898 break; 2899 case DELETE: 2900 Delete delete = ProtobufUtil.toDelete(mutation, cellScanner); 2901 checkCellSizeLimit(region, delete); 2902 spaceQuotaEnforcement.getPolicyEnforcement(region).check(delete); 2903 quota.addMutation(delete); 2904 if (request.hasCondition()) { 2905 Condition condition = request.getCondition(); 2906 byte[] row = condition.getRow().toByteArray(); 2907 byte[] family = condition.getFamily().toByteArray(); 2908 byte[] qualifier = condition.getQualifier().toByteArray(); 2909 CompareOperator op = CompareOperator.valueOf(condition.getCompareType().name()); 2910 ByteArrayComparable comparator = ProtobufUtil.toComparator(condition.getComparator()); 2911 TimeRange timeRange = condition.hasTimeRange() ? 2912 ProtobufUtil.toTimeRange(condition.getTimeRange()) : 2913 TimeRange.allTime(); 2914 if (region.getCoprocessorHost() != null) { 2915 processed = region.getCoprocessorHost().preCheckAndDelete(row, family, qualifier, op, 2916 comparator, delete); 2917 } 2918 if (processed == null) { 2919 boolean result = region.checkAndMutate(row, family, 2920 qualifier, op, comparator, timeRange, delete); 2921 if (region.getCoprocessorHost() != null) { 2922 result = region.getCoprocessorHost().postCheckAndDelete(row, family, 2923 qualifier, op, comparator, delete, result); 2924 } 2925 processed = result; 2926 } 2927 } else { 2928 region.delete(delete); 2929 processed = Boolean.TRUE; 2930 } 2931 break; 2932 default: 2933 throw new DoNotRetryIOException("Unsupported mutate type: " + type.name()); 2934 } 2935 if (processed != null) { 2936 builder.setProcessed(processed.booleanValue()); 2937 } 2938 boolean clientCellBlockSupported = isClientCellBlockSupport(context); 2939 addResult(builder, r, controller, clientCellBlockSupported); 2940 if (clientCellBlockSupported) { 2941 addSize(context, r, null); 2942 } 2943 return builder.build(); 2944 } catch (IOException ie) { 2945 regionServer.checkFileSystem(); 2946 throw new ServiceException(ie); 2947 } finally { 2948 if (quota != null) { 2949 quota.close(); 2950 } 2951 // Update metrics 2952 if (regionServer.metricsRegionServer != null && type != null) { 2953 long after = EnvironmentEdgeManager.currentTime(); 2954 switch (type) { 2955 case DELETE: 2956 if (request.hasCondition()) { 2957 regionServer.metricsRegionServer.updateCheckAndDelete(after - before); 2958 } else { 2959 regionServer.metricsRegionServer.updateDelete( 2960 region == null ? null : region.getRegionInfo().getTable(), after - before); 2961 } 2962 break; 2963 case PUT: 2964 if (request.hasCondition()) { 2965 regionServer.metricsRegionServer.updateCheckAndPut(after - before); 2966 } else { 2967 regionServer.metricsRegionServer.updatePut( 2968 region == null ? null : region.getRegionInfo().getTable(),after - before); 2969 } 2970 break; 2971 default: 2972 break; 2973 2974 } 2975 } 2976 } 2977 } 2978 2979 // This is used to keep compatible with the old client implementation. Consider remove it if we 2980 // decide to drop the support of the client that still sends close request to a region scanner 2981 // which has already been exhausted. 2982 @Deprecated 2983 private static final IOException SCANNER_ALREADY_CLOSED = new IOException() { 2984 2985 private static final long serialVersionUID = -4305297078988180130L; 2986 2987 @Override 2988 public synchronized Throwable fillInStackTrace() { 2989 return this; 2990 } 2991 }; 2992 2993 private RegionScannerHolder getRegionScanner(ScanRequest request) throws IOException { 2994 String scannerName = Long.toString(request.getScannerId()); 2995 RegionScannerHolder rsh = scanners.get(scannerName); 2996 if (rsh == null) { 2997 // just ignore the next or close request if scanner does not exists. 2998 if (closedScanners.getIfPresent(scannerName) != null) { 2999 throw SCANNER_ALREADY_CLOSED; 3000 } else { 3001 LOG.warn("Client tried to access missing scanner " + scannerName); 3002 throw new UnknownScannerException( 3003 "Unknown scanner '" + scannerName + "'. This can happen due to any of the following " + 3004 "reasons: a) Scanner id given is wrong, b) Scanner lease expired because of " + 3005 "long wait between consecutive client checkins, c) Server may be closing down, " + 3006 "d) RegionServer restart during upgrade.\nIf the issue is due to reason (b), a " + 3007 "possible fix would be increasing the value of" + 3008 "'hbase.client.scanner.timeout.period' configuration."); 3009 } 3010 } 3011 RegionInfo hri = rsh.s.getRegionInfo(); 3012 // Yes, should be the same instance 3013 if (regionServer.getOnlineRegion(hri.getRegionName()) != rsh.r) { 3014 String msg = "Region has changed on the scanner " + scannerName + ": regionName=" 3015 + hri.getRegionNameAsString() + ", scannerRegionName=" + rsh.r; 3016 LOG.warn(msg + ", closing..."); 3017 scanners.remove(scannerName); 3018 try { 3019 rsh.s.close(); 3020 } catch (IOException e) { 3021 LOG.warn("Getting exception closing " + scannerName, e); 3022 } finally { 3023 try { 3024 regionServer.leases.cancelLease(scannerName); 3025 } catch (LeaseException e) { 3026 LOG.warn("Getting exception closing " + scannerName, e); 3027 } 3028 } 3029 throw new NotServingRegionException(msg); 3030 } 3031 return rsh; 3032 } 3033 3034 private RegionScannerHolder newRegionScanner(ScanRequest request, ScanResponse.Builder builder) 3035 throws IOException { 3036 HRegion region = getRegion(request.getRegion()); 3037 ClientProtos.Scan protoScan = request.getScan(); 3038 boolean isLoadingCfsOnDemandSet = protoScan.hasLoadColumnFamiliesOnDemand(); 3039 Scan scan = ProtobufUtil.toScan(protoScan); 3040 // if the request doesn't set this, get the default region setting. 3041 if (!isLoadingCfsOnDemandSet) { 3042 scan.setLoadColumnFamiliesOnDemand(region.isLoadingCfsOnDemandDefault()); 3043 } 3044 3045 if (!scan.hasFamilies()) { 3046 // Adding all families to scanner 3047 for (byte[] family : region.getTableDescriptor().getColumnFamilyNames()) { 3048 scan.addFamily(family); 3049 } 3050 } 3051 if (region.getCoprocessorHost() != null) { 3052 // preScannerOpen is not allowed to return a RegionScanner. Only post hook can create a 3053 // wrapper for the core created RegionScanner 3054 region.getCoprocessorHost().preScannerOpen(scan); 3055 } 3056 RegionScannerImpl coreScanner = region.getScanner(scan); 3057 Shipper shipper = coreScanner; 3058 RegionScanner scanner = coreScanner; 3059 if (region.getCoprocessorHost() != null) { 3060 scanner = region.getCoprocessorHost().postScannerOpen(scan, scanner); 3061 } 3062 long scannerId = scannerIdGenerator.generateNewScannerId(); 3063 builder.setScannerId(scannerId); 3064 builder.setMvccReadPoint(scanner.getMvccReadPoint()); 3065 builder.setTtl(scannerLeaseTimeoutPeriod); 3066 String scannerName = String.valueOf(scannerId); 3067 return addScanner(scannerName, scanner, shipper, region, scan.isNeedCursorResult()); 3068 } 3069 3070 private void checkScanNextCallSeq(ScanRequest request, RegionScannerHolder rsh) 3071 throws OutOfOrderScannerNextException { 3072 // if nextCallSeq does not match throw Exception straight away. This needs to be 3073 // performed even before checking of Lease. 3074 // See HBASE-5974 3075 if (request.hasNextCallSeq()) { 3076 long callSeq = request.getNextCallSeq(); 3077 if (!rsh.incNextCallSeq(callSeq)) { 3078 throw new OutOfOrderScannerNextException("Expected nextCallSeq: " + rsh.getNextCallSeq() 3079 + " But the nextCallSeq got from client: " + request.getNextCallSeq() + "; request=" 3080 + TextFormat.shortDebugString(request)); 3081 } 3082 } 3083 } 3084 3085 private void addScannerLeaseBack(Leases.Lease lease) { 3086 try { 3087 regionServer.leases.addLease(lease); 3088 } catch (LeaseStillHeldException e) { 3089 // should not happen as the scanner id is unique. 3090 throw new AssertionError(e); 3091 } 3092 } 3093 3094 private long getTimeLimit(HBaseRpcController controller, boolean allowHeartbeatMessages) { 3095 // Set the time limit to be half of the more restrictive timeout value (one of the 3096 // timeout values must be positive). In the event that both values are positive, the 3097 // more restrictive of the two is used to calculate the limit. 3098 if (allowHeartbeatMessages && (scannerLeaseTimeoutPeriod > 0 || rpcTimeout > 0)) { 3099 long timeLimitDelta; 3100 if (scannerLeaseTimeoutPeriod > 0 && rpcTimeout > 0) { 3101 timeLimitDelta = Math.min(scannerLeaseTimeoutPeriod, rpcTimeout); 3102 } else { 3103 timeLimitDelta = scannerLeaseTimeoutPeriod > 0 ? scannerLeaseTimeoutPeriod : rpcTimeout; 3104 } 3105 if (controller != null && controller.getCallTimeout() > 0) { 3106 timeLimitDelta = Math.min(timeLimitDelta, controller.getCallTimeout()); 3107 } 3108 // Use half of whichever timeout value was more restrictive... But don't allow 3109 // the time limit to be less than the allowable minimum (could cause an 3110 // immediatate timeout before scanning any data). 3111 timeLimitDelta = Math.max(timeLimitDelta / 2, minimumScanTimeLimitDelta); 3112 // XXX: Can not use EnvironmentEdge here because TestIncrementTimeRange use a 3113 // ManualEnvironmentEdge. Consider using System.nanoTime instead. 3114 return System.currentTimeMillis() + timeLimitDelta; 3115 } 3116 // Default value of timeLimit is negative to indicate no timeLimit should be 3117 // enforced. 3118 return -1L; 3119 } 3120 3121 private void checkLimitOfRows(int numOfCompleteRows, int limitOfRows, boolean moreRows, 3122 ScannerContext scannerContext, ScanResponse.Builder builder) { 3123 if (numOfCompleteRows >= limitOfRows) { 3124 if (LOG.isTraceEnabled()) { 3125 LOG.trace("Done scanning, limit of rows reached, moreRows: " + moreRows + 3126 " scannerContext: " + scannerContext); 3127 } 3128 builder.setMoreResults(false); 3129 } 3130 } 3131 3132 // return whether we have more results in region. 3133 private void scan(HBaseRpcController controller, ScanRequest request, RegionScannerHolder rsh, 3134 long maxQuotaResultSize, int maxResults, int limitOfRows, List<Result> results, 3135 ScanResponse.Builder builder, MutableObject<Object> lastBlock, RpcCallContext context) 3136 throws IOException { 3137 HRegion region = rsh.r; 3138 RegionScanner scanner = rsh.s; 3139 long maxResultSize; 3140 if (scanner.getMaxResultSize() > 0) { 3141 maxResultSize = Math.min(scanner.getMaxResultSize(), maxQuotaResultSize); 3142 } else { 3143 maxResultSize = maxQuotaResultSize; 3144 } 3145 // This is cells inside a row. Default size is 10 so if many versions or many cfs, 3146 // then we'll resize. Resizings show in profiler. Set it higher than 10. For now 3147 // arbitrary 32. TODO: keep record of general size of results being returned. 3148 List<Cell> values = new ArrayList<>(32); 3149 region.startRegionOperation(Operation.SCAN); 3150 try { 3151 int numOfResults = 0; 3152 int numOfCompleteRows = 0; 3153 long before = EnvironmentEdgeManager.currentTime(); 3154 synchronized (scanner) { 3155 boolean stale = (region.getRegionInfo().getReplicaId() != 0); 3156 boolean clientHandlesPartials = 3157 request.hasClientHandlesPartials() && request.getClientHandlesPartials(); 3158 boolean clientHandlesHeartbeats = 3159 request.hasClientHandlesHeartbeats() && request.getClientHandlesHeartbeats(); 3160 3161 // On the server side we must ensure that the correct ordering of partial results is 3162 // returned to the client to allow them to properly reconstruct the partial results. 3163 // If the coprocessor host is adding to the result list, we cannot guarantee the 3164 // correct ordering of partial results and so we prevent partial results from being 3165 // formed. 3166 boolean serverGuaranteesOrderOfPartials = results.isEmpty(); 3167 boolean allowPartialResults = clientHandlesPartials && serverGuaranteesOrderOfPartials; 3168 boolean moreRows = false; 3169 3170 // Heartbeat messages occur when the processing of the ScanRequest is exceeds a 3171 // certain time threshold on the server. When the time threshold is exceeded, the 3172 // server stops the scan and sends back whatever Results it has accumulated within 3173 // that time period (may be empty). Since heartbeat messages have the potential to 3174 // create partial Results (in the event that the timeout occurs in the middle of a 3175 // row), we must only generate heartbeat messages when the client can handle both 3176 // heartbeats AND partials 3177 boolean allowHeartbeatMessages = clientHandlesHeartbeats && allowPartialResults; 3178 3179 long timeLimit = getTimeLimit(controller, allowHeartbeatMessages); 3180 3181 final LimitScope sizeScope = 3182 allowPartialResults ? LimitScope.BETWEEN_CELLS : LimitScope.BETWEEN_ROWS; 3183 final LimitScope timeScope = 3184 allowHeartbeatMessages ? LimitScope.BETWEEN_CELLS : LimitScope.BETWEEN_ROWS; 3185 3186 boolean trackMetrics = request.hasTrackScanMetrics() && request.getTrackScanMetrics(); 3187 3188 // Configure with limits for this RPC. Set keep progress true since size progress 3189 // towards size limit should be kept between calls to nextRaw 3190 ScannerContext.Builder contextBuilder = ScannerContext.newBuilder(true); 3191 // maxResultSize - either we can reach this much size for all cells(being read) data or sum 3192 // of heap size occupied by cells(being read). Cell data means its key and value parts. 3193 contextBuilder.setSizeLimit(sizeScope, maxResultSize, maxResultSize); 3194 contextBuilder.setBatchLimit(scanner.getBatch()); 3195 contextBuilder.setTimeLimit(timeScope, timeLimit); 3196 contextBuilder.setTrackMetrics(trackMetrics); 3197 ScannerContext scannerContext = contextBuilder.build(); 3198 boolean limitReached = false; 3199 while (numOfResults < maxResults) { 3200 // Reset the batch progress to 0 before every call to RegionScanner#nextRaw. The 3201 // batch limit is a limit on the number of cells per Result. Thus, if progress is 3202 // being tracked (i.e. scannerContext.keepProgress() is true) then we need to 3203 // reset the batch progress between nextRaw invocations since we don't want the 3204 // batch progress from previous calls to affect future calls 3205 scannerContext.setBatchProgress(0); 3206 3207 // Collect values to be returned here 3208 moreRows = scanner.nextRaw(values, scannerContext); 3209 3210 if (!values.isEmpty()) { 3211 if (limitOfRows > 0) { 3212 // First we need to check if the last result is partial and we have a row change. If 3213 // so then we need to increase the numOfCompleteRows. 3214 if (results.isEmpty()) { 3215 if (rsh.rowOfLastPartialResult != null && 3216 !CellUtil.matchingRows(values.get(0), rsh.rowOfLastPartialResult)) { 3217 numOfCompleteRows++; 3218 checkLimitOfRows(numOfCompleteRows, limitOfRows, moreRows, scannerContext, 3219 builder); 3220 } 3221 } else { 3222 Result lastResult = results.get(results.size() - 1); 3223 if (lastResult.mayHaveMoreCellsInRow() && 3224 !CellUtil.matchingRows(values.get(0), lastResult.getRow())) { 3225 numOfCompleteRows++; 3226 checkLimitOfRows(numOfCompleteRows, limitOfRows, moreRows, scannerContext, 3227 builder); 3228 } 3229 } 3230 if (builder.hasMoreResults() && !builder.getMoreResults()) { 3231 break; 3232 } 3233 } 3234 boolean mayHaveMoreCellsInRow = scannerContext.mayHaveMoreCellsInRow(); 3235 Result r = Result.create(values, null, stale, mayHaveMoreCellsInRow); 3236 lastBlock.setValue(addSize(context, r, lastBlock.getValue())); 3237 results.add(r); 3238 numOfResults++; 3239 if (!mayHaveMoreCellsInRow && limitOfRows > 0) { 3240 numOfCompleteRows++; 3241 checkLimitOfRows(numOfCompleteRows, limitOfRows, moreRows, scannerContext, builder); 3242 if (builder.hasMoreResults() && !builder.getMoreResults()) { 3243 break; 3244 } 3245 } 3246 } else if (!moreRows && !results.isEmpty()) { 3247 // No more cells for the scan here, we need to ensure that the mayHaveMoreCellsInRow of 3248 // last result is false. Otherwise it's possible that: the first nextRaw returned 3249 // because BATCH_LIMIT_REACHED (BTW it happen to exhaust all cells of the scan),so the 3250 // last result's mayHaveMoreCellsInRow will be true. while the following nextRaw will 3251 // return with moreRows=false, which means moreResultsInRegion would be false, it will 3252 // be a contradictory state (HBASE-21206). 3253 int lastIdx = results.size() - 1; 3254 Result r = results.get(lastIdx); 3255 if (r.mayHaveMoreCellsInRow()) { 3256 results.set(lastIdx, Result.create(r.rawCells(), r.getExists(), r.isStale(), false)); 3257 } 3258 } 3259 boolean sizeLimitReached = scannerContext.checkSizeLimit(LimitScope.BETWEEN_ROWS); 3260 boolean timeLimitReached = scannerContext.checkTimeLimit(LimitScope.BETWEEN_ROWS); 3261 boolean resultsLimitReached = numOfResults >= maxResults; 3262 limitReached = sizeLimitReached || timeLimitReached || resultsLimitReached; 3263 3264 if (limitReached || !moreRows) { 3265 // We only want to mark a ScanResponse as a heartbeat message in the event that 3266 // there are more values to be read server side. If there aren't more values, 3267 // marking it as a heartbeat is wasteful because the client will need to issue 3268 // another ScanRequest only to realize that they already have all the values 3269 if (moreRows && timeLimitReached) { 3270 // Heartbeat messages occur when the time limit has been reached. 3271 builder.setHeartbeatMessage(true); 3272 if (rsh.needCursor) { 3273 Cell cursorCell = scannerContext.getLastPeekedCell(); 3274 if (cursorCell != null) { 3275 builder.setCursor(ProtobufUtil.toCursor(cursorCell)); 3276 } 3277 } 3278 } 3279 break; 3280 } 3281 values.clear(); 3282 } 3283 builder.setMoreResultsInRegion(moreRows); 3284 // Check to see if the client requested that we track metrics server side. If the 3285 // client requested metrics, retrieve the metrics from the scanner context. 3286 if (trackMetrics) { 3287 Map<String, Long> metrics = scannerContext.getMetrics().getMetricsMap(); 3288 ScanMetrics.Builder metricBuilder = ScanMetrics.newBuilder(); 3289 NameInt64Pair.Builder pairBuilder = NameInt64Pair.newBuilder(); 3290 3291 for (Entry<String, Long> entry : metrics.entrySet()) { 3292 pairBuilder.setName(entry.getKey()); 3293 pairBuilder.setValue(entry.getValue()); 3294 metricBuilder.addMetrics(pairBuilder.build()); 3295 } 3296 3297 builder.setScanMetrics(metricBuilder.build()); 3298 } 3299 } 3300 long end = EnvironmentEdgeManager.currentTime(); 3301 long responseCellSize = context != null ? context.getResponseCellSize() : 0; 3302 region.getMetrics().updateScanTime(end - before); 3303 if (regionServer.metricsRegionServer != null) { 3304 regionServer.metricsRegionServer.updateScanSize( 3305 region.getTableDescriptor().getTableName(), responseCellSize); 3306 regionServer.metricsRegionServer.updateScanTime( 3307 region.getTableDescriptor().getTableName(), end - before); 3308 } 3309 } finally { 3310 region.closeRegionOperation(); 3311 } 3312 // coprocessor postNext hook 3313 if (region.getCoprocessorHost() != null) { 3314 region.getCoprocessorHost().postScannerNext(scanner, results, maxResults, true); 3315 } 3316 } 3317 3318 /** 3319 * Scan data in a table. 3320 * 3321 * @param controller the RPC controller 3322 * @param request the scan request 3323 * @throws ServiceException 3324 */ 3325 @Override 3326 public ScanResponse scan(final RpcController controller, final ScanRequest request) 3327 throws ServiceException { 3328 if (controller != null && !(controller instanceof HBaseRpcController)) { 3329 throw new UnsupportedOperationException( 3330 "We only do " + "HBaseRpcControllers! FIX IF A PROBLEM: " + controller); 3331 } 3332 if (!request.hasScannerId() && !request.hasScan()) { 3333 throw new ServiceException( 3334 new DoNotRetryIOException("Missing required input: scannerId or scan")); 3335 } 3336 try { 3337 checkOpen(); 3338 } catch (IOException e) { 3339 if (request.hasScannerId()) { 3340 String scannerName = Long.toString(request.getScannerId()); 3341 if (LOG.isDebugEnabled()) { 3342 LOG.debug( 3343 "Server shutting down and client tried to access missing scanner " + scannerName); 3344 } 3345 if (regionServer.leases != null) { 3346 try { 3347 regionServer.leases.cancelLease(scannerName); 3348 } catch (LeaseException le) { 3349 // No problem, ignore 3350 if (LOG.isTraceEnabled()) { 3351 LOG.trace("Un-able to cancel lease of scanner. It could already be closed."); 3352 } 3353 } 3354 } 3355 } 3356 throw new ServiceException(e); 3357 } 3358 requestCount.increment(); 3359 rpcScanRequestCount.increment(); 3360 RegionScannerHolder rsh; 3361 ScanResponse.Builder builder = ScanResponse.newBuilder(); 3362 try { 3363 if (request.hasScannerId()) { 3364 // The downstream projects such as AsyncHBase in OpenTSDB need this value. See HBASE-18000 3365 // for more details. 3366 builder.setScannerId(request.getScannerId()); 3367 rsh = getRegionScanner(request); 3368 } else { 3369 rsh = newRegionScanner(request, builder); 3370 } 3371 } catch (IOException e) { 3372 if (e == SCANNER_ALREADY_CLOSED) { 3373 // Now we will close scanner automatically if there are no more results for this region but 3374 // the old client will still send a close request to us. Just ignore it and return. 3375 return builder.build(); 3376 } 3377 throw new ServiceException(e); 3378 } 3379 HRegion region = rsh.r; 3380 String scannerName = rsh.scannerName; 3381 Leases.Lease lease; 3382 try { 3383 // Remove lease while its being processed in server; protects against case 3384 // where processing of request takes > lease expiration time. 3385 lease = regionServer.leases.removeLease(scannerName); 3386 } catch (LeaseException e) { 3387 throw new ServiceException(e); 3388 } 3389 if (request.hasRenew() && request.getRenew()) { 3390 // add back and return 3391 addScannerLeaseBack(lease); 3392 try { 3393 checkScanNextCallSeq(request, rsh); 3394 } catch (OutOfOrderScannerNextException e) { 3395 throw new ServiceException(e); 3396 } 3397 return builder.build(); 3398 } 3399 OperationQuota quota; 3400 try { 3401 quota = getRpcQuotaManager().checkQuota(region, OperationQuota.OperationType.SCAN); 3402 } catch (IOException e) { 3403 addScannerLeaseBack(lease); 3404 throw new ServiceException(e); 3405 } 3406 try { 3407 checkScanNextCallSeq(request, rsh); 3408 } catch (OutOfOrderScannerNextException e) { 3409 addScannerLeaseBack(lease); 3410 throw new ServiceException(e); 3411 } 3412 // Now we have increased the next call sequence. If we give client an error, the retry will 3413 // never success. So we'd better close the scanner and return a DoNotRetryIOException to client 3414 // and then client will try to open a new scanner. 3415 boolean closeScanner = request.hasCloseScanner() ? request.getCloseScanner() : false; 3416 int rows; // this is scan.getCaching 3417 if (request.hasNumberOfRows()) { 3418 rows = request.getNumberOfRows(); 3419 } else { 3420 rows = closeScanner ? 0 : 1; 3421 } 3422 RpcCallContext context = RpcServer.getCurrentCall().orElse(null); 3423 // now let's do the real scan. 3424 long maxQuotaResultSize = Math.min(maxScannerResultSize, quota.getReadAvailable()); 3425 RegionScanner scanner = rsh.s; 3426 // this is the limit of rows for this scan, if we the number of rows reach this value, we will 3427 // close the scanner. 3428 int limitOfRows; 3429 if (request.hasLimitOfRows()) { 3430 limitOfRows = request.getLimitOfRows(); 3431 } else { 3432 limitOfRows = -1; 3433 } 3434 MutableObject<Object> lastBlock = new MutableObject<>(); 3435 boolean scannerClosed = false; 3436 try { 3437 List<Result> results = new ArrayList<>(Math.min(rows, 512)); 3438 if (rows > 0) { 3439 boolean done = false; 3440 // Call coprocessor. Get region info from scanner. 3441 if (region.getCoprocessorHost() != null) { 3442 Boolean bypass = region.getCoprocessorHost().preScannerNext(scanner, results, rows); 3443 if (!results.isEmpty()) { 3444 for (Result r : results) { 3445 lastBlock.setValue(addSize(context, r, lastBlock.getValue())); 3446 } 3447 } 3448 if (bypass != null && bypass.booleanValue()) { 3449 done = true; 3450 } 3451 } 3452 if (!done) { 3453 scan((HBaseRpcController) controller, request, rsh, maxQuotaResultSize, rows, limitOfRows, 3454 results, builder, lastBlock, context); 3455 } else { 3456 builder.setMoreResultsInRegion(!results.isEmpty()); 3457 } 3458 } else { 3459 // This is a open scanner call with numberOfRow = 0, so set more results in region to true. 3460 builder.setMoreResultsInRegion(true); 3461 } 3462 3463 quota.addScanResult(results); 3464 addResults(builder, results, (HBaseRpcController) controller, 3465 RegionReplicaUtil.isDefaultReplica(region.getRegionInfo()), 3466 isClientCellBlockSupport(context)); 3467 if (scanner.isFilterDone() && results.isEmpty()) { 3468 // If the scanner's filter - if any - is done with the scan 3469 // only set moreResults to false if the results is empty. This is used to keep compatible 3470 // with the old scan implementation where we just ignore the returned results if moreResults 3471 // is false. Can remove the isEmpty check after we get rid of the old implementation. 3472 builder.setMoreResults(false); 3473 } 3474 // Later we may close the scanner depending on this flag so here we need to make sure that we 3475 // have already set this flag. 3476 assert builder.hasMoreResultsInRegion(); 3477 // we only set moreResults to false in the above code, so set it to true if we haven't set it 3478 // yet. 3479 if (!builder.hasMoreResults()) { 3480 builder.setMoreResults(true); 3481 } 3482 if (builder.getMoreResults() && builder.getMoreResultsInRegion() && !results.isEmpty()) { 3483 // Record the last cell of the last result if it is a partial result 3484 // We need this to calculate the complete rows we have returned to client as the 3485 // mayHaveMoreCellsInRow is true does not mean that there will be extra cells for the 3486 // current row. We may filter out all the remaining cells for the current row and just 3487 // return the cells of the nextRow when calling RegionScanner.nextRaw. So here we need to 3488 // check for row change. 3489 Result lastResult = results.get(results.size() - 1); 3490 if (lastResult.mayHaveMoreCellsInRow()) { 3491 rsh.rowOfLastPartialResult = lastResult.getRow(); 3492 } else { 3493 rsh.rowOfLastPartialResult = null; 3494 } 3495 } 3496 if (!builder.getMoreResults() || !builder.getMoreResultsInRegion() || closeScanner) { 3497 scannerClosed = true; 3498 closeScanner(region, scanner, scannerName, context); 3499 } 3500 return builder.build(); 3501 } catch (IOException e) { 3502 try { 3503 // scanner is closed here 3504 scannerClosed = true; 3505 // The scanner state might be left in a dirty state, so we will tell the Client to 3506 // fail this RPC and close the scanner while opening up another one from the start of 3507 // row that the client has last seen. 3508 closeScanner(region, scanner, scannerName, context); 3509 3510 // If it is a DoNotRetryIOException already, throw as it is. Unfortunately, DNRIOE is 3511 // used in two different semantics. 3512 // (1) The first is to close the client scanner and bubble up the exception all the way 3513 // to the application. This is preferred when the exception is really un-recoverable 3514 // (like CorruptHFileException, etc). Plain DoNotRetryIOException also falls into this 3515 // bucket usually. 3516 // (2) Second semantics is to close the current region scanner only, but continue the 3517 // client scanner by overriding the exception. This is usually UnknownScannerException, 3518 // OutOfOrderScannerNextException, etc where the region scanner has to be closed, but the 3519 // application-level ClientScanner has to continue without bubbling up the exception to 3520 // the client. See ClientScanner code to see how it deals with these special exceptions. 3521 if (e instanceof DoNotRetryIOException) { 3522 throw e; 3523 } 3524 3525 // If it is a FileNotFoundException, wrap as a 3526 // DoNotRetryIOException. This can avoid the retry in ClientScanner. 3527 if (e instanceof FileNotFoundException) { 3528 throw new DoNotRetryIOException(e); 3529 } 3530 3531 // We closed the scanner already. Instead of throwing the IOException, and client 3532 // retrying with the same scannerId only to get USE on the next RPC, we directly throw 3533 // a special exception to save an RPC. 3534 if (VersionInfoUtil.hasMinimumVersion(context.getClientVersionInfo(), 1, 4)) { 3535 // 1.4.0+ clients know how to handle 3536 throw new ScannerResetException("Scanner is closed on the server-side", e); 3537 } else { 3538 // older clients do not know about SRE. Just throw USE, which they will handle 3539 throw new UnknownScannerException("Throwing UnknownScannerException to reset the client" 3540 + " scanner state for clients older than 1.3.", e); 3541 } 3542 } catch (IOException ioe) { 3543 throw new ServiceException(ioe); 3544 } 3545 } finally { 3546 if (!scannerClosed) { 3547 // Adding resets expiration time on lease. 3548 // the closeCallBack will be set in closeScanner so here we only care about shippedCallback 3549 if (context != null) { 3550 context.setCallBack(rsh.shippedCallback); 3551 } else { 3552 // When context != null, adding back the lease will be done in callback set above. 3553 addScannerLeaseBack(lease); 3554 } 3555 } 3556 quota.close(); 3557 } 3558 } 3559 3560 private void closeScanner(HRegion region, RegionScanner scanner, String scannerName, 3561 RpcCallContext context) throws IOException { 3562 if (region.getCoprocessorHost() != null) { 3563 if (region.getCoprocessorHost().preScannerClose(scanner)) { 3564 // bypass the actual close. 3565 return; 3566 } 3567 } 3568 RegionScannerHolder rsh = scanners.remove(scannerName); 3569 if (rsh != null) { 3570 if (context != null) { 3571 context.setCallBack(rsh.closeCallBack); 3572 } else { 3573 rsh.s.close(); 3574 } 3575 if (region.getCoprocessorHost() != null) { 3576 region.getCoprocessorHost().postScannerClose(scanner); 3577 } 3578 closedScanners.put(scannerName, scannerName); 3579 } 3580 } 3581 3582 @Override 3583 public CoprocessorServiceResponse execRegionServerService(RpcController controller, 3584 CoprocessorServiceRequest request) throws ServiceException { 3585 rpcPreCheck("execRegionServerService"); 3586 return regionServer.execRegionServerService(controller, request); 3587 } 3588 3589 @Override 3590 public UpdateConfigurationResponse updateConfiguration( 3591 RpcController controller, UpdateConfigurationRequest request) 3592 throws ServiceException { 3593 try { 3594 this.regionServer.updateConfiguration(); 3595 } catch (Exception e) { 3596 throw new ServiceException(e); 3597 } 3598 return UpdateConfigurationResponse.getDefaultInstance(); 3599 } 3600 3601 @Override 3602 public GetSpaceQuotaSnapshotsResponse getSpaceQuotaSnapshots( 3603 RpcController controller, GetSpaceQuotaSnapshotsRequest request) throws ServiceException { 3604 try { 3605 final RegionServerSpaceQuotaManager manager = 3606 regionServer.getRegionServerSpaceQuotaManager(); 3607 final GetSpaceQuotaSnapshotsResponse.Builder builder = 3608 GetSpaceQuotaSnapshotsResponse.newBuilder(); 3609 if (manager != null) { 3610 final Map<TableName,SpaceQuotaSnapshot> snapshots = manager.copyQuotaSnapshots(); 3611 for (Entry<TableName,SpaceQuotaSnapshot> snapshot : snapshots.entrySet()) { 3612 builder.addSnapshots(TableQuotaSnapshot.newBuilder() 3613 .setTableName(ProtobufUtil.toProtoTableName(snapshot.getKey())) 3614 .setSnapshot(SpaceQuotaSnapshot.toProtoSnapshot(snapshot.getValue())) 3615 .build()); 3616 } 3617 } 3618 return builder.build(); 3619 } catch (Exception e) { 3620 throw new ServiceException(e); 3621 } 3622 } 3623 3624 @Override 3625 public ClearRegionBlockCacheResponse clearRegionBlockCache(RpcController controller, 3626 ClearRegionBlockCacheRequest request) { 3627 ClearRegionBlockCacheResponse.Builder builder = 3628 ClearRegionBlockCacheResponse.newBuilder(); 3629 CacheEvictionStatsBuilder stats = CacheEvictionStats.builder(); 3630 List<HRegion> regions = getRegions(request.getRegionList(), stats); 3631 for (HRegion region : regions) { 3632 try { 3633 stats = stats.append(this.regionServer.clearRegionBlockCache(region)); 3634 } catch (Exception e) { 3635 stats.addException(region.getRegionInfo().getRegionName(), e); 3636 } 3637 } 3638 stats.withMaxCacheSize(regionServer.getBlockCache().map(BlockCache::getMaxSize).orElse(0L)); 3639 return builder.setStats(ProtobufUtil.toCacheEvictionStats(stats.build())).build(); 3640 } 3641 3642 private void executeOpenRegionProcedures(OpenRegionRequest request, 3643 Map<TableName, TableDescriptor> tdCache) { 3644 long masterSystemTime = request.hasMasterSystemTime() ? request.getMasterSystemTime() : -1; 3645 for (RegionOpenInfo regionOpenInfo : request.getOpenInfoList()) { 3646 RegionInfo regionInfo = ProtobufUtil.toRegionInfo(regionOpenInfo.getRegion()); 3647 TableDescriptor tableDesc = tdCache.get(regionInfo.getTable()); 3648 if (tableDesc == null) { 3649 try { 3650 tableDesc = regionServer.getTableDescriptors().get(regionInfo.getTable()); 3651 } catch (IOException e) { 3652 // Here we do not fail the whole method since we also need deal with other 3653 // procedures, and we can not ignore this one, so we still schedule a 3654 // AssignRegionHandler and it will report back to master if we still can not get the 3655 // TableDescriptor. 3656 LOG.warn("Failed to get TableDescriptor of {}, will try again in the handler", 3657 regionInfo.getTable(), e); 3658 } 3659 } 3660 if (regionOpenInfo.getFavoredNodesCount() > 0) { 3661 regionServer.updateRegionFavoredNodesMapping(regionInfo.getEncodedName(), 3662 regionOpenInfo.getFavoredNodesList()); 3663 } 3664 long procId = regionOpenInfo.getOpenProcId(); 3665 if (regionServer.submitRegionProcedure(procId)) { 3666 regionServer.executorService.submit(AssignRegionHandler 3667 .create(regionServer, regionInfo, procId, tableDesc, 3668 masterSystemTime)); 3669 } 3670 } 3671 } 3672 3673 private void executeCloseRegionProcedures(CloseRegionRequest request) { 3674 String encodedName; 3675 try { 3676 encodedName = ProtobufUtil.getRegionEncodedName(request.getRegion()); 3677 } catch (DoNotRetryIOException e) { 3678 throw new UncheckedIOException("Should not happen", e); 3679 } 3680 ServerName destination = request.hasDestinationServer() ? 3681 ProtobufUtil.toServerName(request.getDestinationServer()) : 3682 null; 3683 long procId = request.getCloseProcId(); 3684 if (regionServer.submitRegionProcedure(procId)) { 3685 regionServer.executorService.submit(UnassignRegionHandler 3686 .create(regionServer, encodedName, procId, false, destination)); 3687 } 3688 } 3689 3690 private void executeProcedures(RemoteProcedureRequest request) { 3691 RSProcedureCallable callable; 3692 try { 3693 callable = Class.forName(request.getProcClass()).asSubclass(RSProcedureCallable.class) 3694 .getDeclaredConstructor().newInstance(); 3695 } catch (Exception e) { 3696 LOG.warn("Failed to instantiating remote procedure {}, pid={}", request.getProcClass(), 3697 request.getProcId(), e); 3698 regionServer.remoteProcedureComplete(request.getProcId(), e); 3699 return; 3700 } 3701 callable.init(request.getProcData().toByteArray(), regionServer); 3702 LOG.debug("Executing remote procedure {}, pid={}", callable.getClass(), request.getProcId()); 3703 regionServer.executeProcedure(request.getProcId(), callable); 3704 } 3705 3706 @Override 3707 @QosPriority(priority = HConstants.ADMIN_QOS) 3708 public ExecuteProceduresResponse executeProcedures(RpcController controller, 3709 ExecuteProceduresRequest request) throws ServiceException { 3710 try { 3711 checkOpen(); 3712 throwOnWrongStartCode(request); 3713 regionServer.getRegionServerCoprocessorHost().preExecuteProcedures(); 3714 if (request.getOpenRegionCount() > 0) { 3715 // Avoid reading from the TableDescritor every time(usually it will read from the file 3716 // system) 3717 Map<TableName, TableDescriptor> tdCache = new HashMap<>(); 3718 request.getOpenRegionList().forEach(req -> executeOpenRegionProcedures(req, tdCache)); 3719 } 3720 if (request.getCloseRegionCount() > 0) { 3721 request.getCloseRegionList().forEach(this::executeCloseRegionProcedures); 3722 } 3723 if (request.getProcCount() > 0) { 3724 request.getProcList().forEach(this::executeProcedures); 3725 } 3726 regionServer.getRegionServerCoprocessorHost().postExecuteProcedures(); 3727 return ExecuteProceduresResponse.getDefaultInstance(); 3728 } catch (IOException e) { 3729 throw new ServiceException(e); 3730 } 3731 } 3732 3733 @VisibleForTesting 3734 public RpcScheduler getRpcScheduler() { 3735 return rpcServer.getScheduler(); 3736 } 3737}