001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019 020import java.io.FileNotFoundException; 021import java.io.IOException; 022import java.io.UncheckedIOException; 023import java.lang.reflect.InvocationTargetException; 024import java.lang.reflect.Method; 025import java.net.BindException; 026import java.net.InetSocketAddress; 027import java.nio.ByteBuffer; 028import java.util.ArrayList; 029import java.util.Arrays; 030import java.util.Collections; 031import java.util.HashMap; 032import java.util.Iterator; 033import java.util.List; 034import java.util.Map; 035import java.util.Map.Entry; 036import java.util.NavigableMap; 037import java.util.Optional; 038import java.util.Set; 039import java.util.TreeSet; 040import java.util.concurrent.ConcurrentHashMap; 041import java.util.concurrent.ConcurrentMap; 042import java.util.concurrent.TimeUnit; 043import java.util.concurrent.atomic.AtomicBoolean; 044import java.util.concurrent.atomic.AtomicLong; 045import java.util.concurrent.atomic.LongAdder; 046import org.apache.commons.lang3.mutable.MutableObject; 047import org.apache.hadoop.conf.Configuration; 048import org.apache.hadoop.fs.Path; 049import org.apache.hadoop.hbase.ByteBufferExtendedCell; 050import org.apache.hadoop.hbase.CacheEvictionStats; 051import org.apache.hadoop.hbase.CacheEvictionStatsBuilder; 052import org.apache.hadoop.hbase.Cell; 053import org.apache.hadoop.hbase.CellScannable; 054import org.apache.hadoop.hbase.CellScanner; 055import org.apache.hadoop.hbase.CellUtil; 056import org.apache.hadoop.hbase.DoNotRetryIOException; 057import org.apache.hadoop.hbase.DroppedSnapshotException; 058import org.apache.hadoop.hbase.HBaseIOException; 059import org.apache.hadoop.hbase.HConstants; 060import org.apache.hadoop.hbase.MultiActionResultTooLarge; 061import org.apache.hadoop.hbase.NotServingRegionException; 062import org.apache.hadoop.hbase.PrivateCellUtil; 063import org.apache.hadoop.hbase.RegionTooBusyException; 064import org.apache.hadoop.hbase.Server; 065import org.apache.hadoop.hbase.ServerName; 066import org.apache.hadoop.hbase.TableName; 067import org.apache.hadoop.hbase.UnknownScannerException; 068import org.apache.hadoop.hbase.client.Append; 069import org.apache.hadoop.hbase.client.CheckAndMutate; 070import org.apache.hadoop.hbase.client.CheckAndMutateResult; 071import org.apache.hadoop.hbase.client.ConnectionUtils; 072import org.apache.hadoop.hbase.client.Delete; 073import org.apache.hadoop.hbase.client.Durability; 074import org.apache.hadoop.hbase.client.Get; 075import org.apache.hadoop.hbase.client.Increment; 076import org.apache.hadoop.hbase.client.Mutation; 077import org.apache.hadoop.hbase.client.Put; 078import org.apache.hadoop.hbase.client.RegionInfo; 079import org.apache.hadoop.hbase.client.RegionReplicaUtil; 080import org.apache.hadoop.hbase.client.Result; 081import org.apache.hadoop.hbase.client.Row; 082import org.apache.hadoop.hbase.client.Scan; 083import org.apache.hadoop.hbase.client.TableDescriptor; 084import org.apache.hadoop.hbase.client.VersionInfoUtil; 085import org.apache.hadoop.hbase.conf.ConfigurationObserver; 086import org.apache.hadoop.hbase.exceptions.FailedSanityCheckException; 087import org.apache.hadoop.hbase.exceptions.OutOfOrderScannerNextException; 088import org.apache.hadoop.hbase.exceptions.ScannerResetException; 089import org.apache.hadoop.hbase.exceptions.UnknownProtocolException; 090import org.apache.hadoop.hbase.io.ByteBuffAllocator; 091import org.apache.hadoop.hbase.io.hfile.BlockCache; 092import org.apache.hadoop.hbase.ipc.HBaseRPCErrorHandler; 093import org.apache.hadoop.hbase.ipc.HBaseRpcController; 094import org.apache.hadoop.hbase.ipc.PriorityFunction; 095import org.apache.hadoop.hbase.ipc.QosPriority; 096import org.apache.hadoop.hbase.ipc.RpcCallContext; 097import org.apache.hadoop.hbase.ipc.RpcCallback; 098import org.apache.hadoop.hbase.ipc.RpcScheduler; 099import org.apache.hadoop.hbase.ipc.RpcServer; 100import org.apache.hadoop.hbase.ipc.RpcServer.BlockingServiceAndInterface; 101import org.apache.hadoop.hbase.ipc.RpcServerFactory; 102import org.apache.hadoop.hbase.ipc.RpcServerInterface; 103import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException; 104import org.apache.hadoop.hbase.ipc.ServerRpcController; 105import org.apache.hadoop.hbase.log.HBaseMarkers; 106import org.apache.hadoop.hbase.master.HMaster; 107import org.apache.hadoop.hbase.master.MasterRpcServices; 108import org.apache.hadoop.hbase.namequeues.NamedQueuePayload; 109import org.apache.hadoop.hbase.namequeues.NamedQueueRecorder; 110import org.apache.hadoop.hbase.namequeues.RpcLogDetails; 111import org.apache.hadoop.hbase.namequeues.request.NamedQueueGetRequest; 112import org.apache.hadoop.hbase.namequeues.response.NamedQueueGetResponse; 113import org.apache.hadoop.hbase.net.Address; 114import org.apache.hadoop.hbase.procedure2.RSProcedureCallable; 115import org.apache.hadoop.hbase.quotas.ActivePolicyEnforcement; 116import org.apache.hadoop.hbase.quotas.OperationQuota; 117import org.apache.hadoop.hbase.quotas.QuotaUtil; 118import org.apache.hadoop.hbase.quotas.RegionServerRpcQuotaManager; 119import org.apache.hadoop.hbase.quotas.RegionServerSpaceQuotaManager; 120import org.apache.hadoop.hbase.quotas.SpaceQuotaSnapshot; 121import org.apache.hadoop.hbase.quotas.SpaceViolationPolicyEnforcement; 122import org.apache.hadoop.hbase.regionserver.HRegion.RegionScannerImpl; 123import org.apache.hadoop.hbase.regionserver.LeaseManager.Lease; 124import org.apache.hadoop.hbase.regionserver.LeaseManager.LeaseStillHeldException; 125import org.apache.hadoop.hbase.regionserver.Region.Operation; 126import org.apache.hadoop.hbase.regionserver.ScannerContext.LimitScope; 127import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker; 128import org.apache.hadoop.hbase.regionserver.handler.AssignRegionHandler; 129import org.apache.hadoop.hbase.regionserver.handler.OpenMetaHandler; 130import org.apache.hadoop.hbase.regionserver.handler.OpenPriorityRegionHandler; 131import org.apache.hadoop.hbase.regionserver.handler.OpenRegionHandler; 132import org.apache.hadoop.hbase.regionserver.handler.UnassignRegionHandler; 133import org.apache.hadoop.hbase.security.Superusers; 134import org.apache.hadoop.hbase.security.User; 135import org.apache.hadoop.hbase.security.access.AccessChecker; 136import org.apache.hadoop.hbase.security.access.NoopAccessChecker; 137import org.apache.hadoop.hbase.security.access.Permission; 138import org.apache.hadoop.hbase.security.access.ZKPermissionWatcher; 139import org.apache.hadoop.hbase.util.Bytes; 140import org.apache.hadoop.hbase.util.DNS; 141import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 142import org.apache.hadoop.hbase.util.Pair; 143import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil; 144import org.apache.hadoop.hbase.wal.WAL; 145import org.apache.hadoop.hbase.wal.WALEdit; 146import org.apache.hadoop.hbase.wal.WALKey; 147import org.apache.hadoop.hbase.wal.WALSplitUtil; 148import org.apache.hadoop.hbase.wal.WALSplitUtil.MutationReplay; 149import org.apache.hadoop.hbase.zookeeper.ZKWatcher; 150import org.apache.yetus.audience.InterfaceAudience; 151import org.apache.zookeeper.KeeperException; 152import org.slf4j.Logger; 153import org.slf4j.LoggerFactory; 154 155import org.apache.hbase.thirdparty.com.google.common.cache.Cache; 156import org.apache.hbase.thirdparty.com.google.common.cache.CacheBuilder; 157import org.apache.hbase.thirdparty.com.google.common.collect.Lists; 158import org.apache.hbase.thirdparty.com.google.protobuf.ByteString; 159import org.apache.hbase.thirdparty.com.google.protobuf.Message; 160import org.apache.hbase.thirdparty.com.google.protobuf.RpcController; 161import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException; 162import org.apache.hbase.thirdparty.com.google.protobuf.TextFormat; 163import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations; 164import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils; 165 166import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 167import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter; 168import org.apache.hadoop.hbase.shaded.protobuf.ResponseConverter; 169import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService; 170import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearCompactionQueuesRequest; 171import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearCompactionQueuesResponse; 172import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearRegionBlockCacheRequest; 173import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearRegionBlockCacheResponse; 174import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearSlowLogResponseRequest; 175import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearSlowLogResponses; 176import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CloseRegionRequest; 177import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CloseRegionResponse; 178import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactRegionRequest; 179import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactRegionResponse; 180import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactionSwitchRequest; 181import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactionSwitchResponse; 182import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ExecuteProceduresRequest; 183import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ExecuteProceduresResponse; 184import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionRequest; 185import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionResponse; 186import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetOnlineRegionRequest; 187import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetOnlineRegionResponse; 188import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoRequest; 189import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoResponse; 190import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionLoadRequest; 191import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionLoadResponse; 192import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetServerInfoRequest; 193import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetServerInfoResponse; 194import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetStoreFileRequest; 195import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetStoreFileResponse; 196import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionRequest; 197import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionRequest.RegionOpenInfo; 198import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionResponse; 199import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionResponse.RegionOpeningState; 200import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.RemoteProcedureRequest; 201import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ReplicateWALEntryRequest; 202import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ReplicateWALEntryResponse; 203import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.RollWALWriterRequest; 204import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.RollWALWriterResponse; 205import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.SlowLogResponseRequest; 206import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.SlowLogResponses; 207import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.StopServerRequest; 208import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.StopServerResponse; 209import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateConfigurationRequest; 210import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateConfigurationResponse; 211import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateFavoredNodesRequest; 212import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateFavoredNodesResponse; 213import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WALEntry; 214import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WarmupRegionRequest; 215import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WarmupRegionResponse; 216import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos; 217import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.Action; 218import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.BulkLoadHFileRequest; 219import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.BulkLoadHFileRequest.FamilyPath; 220import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.BulkLoadHFileResponse; 221import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CleanupBulkLoadRequest; 222import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CleanupBulkLoadResponse; 223import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService; 224import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.Condition; 225import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceRequest; 226import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceResponse; 227import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.GetRequest; 228import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.GetResponse; 229import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MultiRegionLoadStats; 230import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MultiRequest; 231import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MultiResponse; 232import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateRequest; 233import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateResponse; 234import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutationProto; 235import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutationProto.MutationType; 236import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.PrepareBulkLoadRequest; 237import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.PrepareBulkLoadResponse; 238import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.RegionAction; 239import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.RegionActionResult; 240import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ResultOrException; 241import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanRequest; 242import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanResponse; 243import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos; 244import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos.RegionLoad; 245import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos; 246import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.NameBytesPair; 247import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.NameInt64Pair; 248import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier; 249import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType; 250import org.apache.hadoop.hbase.shaded.protobuf.generated.MapReduceProtos.ScanMetrics; 251import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsRequest; 252import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsResponse; 253import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsResponse.TableQuotaSnapshot; 254import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader; 255import org.apache.hadoop.hbase.shaded.protobuf.generated.TooSlowLog.SlowLogPayload; 256import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.BulkLoadDescriptor; 257import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.CompactionDescriptor; 258import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.FlushDescriptor; 259import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.RegionEventDescriptor; 260 261/** 262 * Implements the regionserver RPC services. 263 */ 264@InterfaceAudience.Private 265@SuppressWarnings("deprecation") 266public class RSRpcServices implements HBaseRPCErrorHandler, 267 AdminService.BlockingInterface, ClientService.BlockingInterface, PriorityFunction, 268 ConfigurationObserver { 269 protected static final Logger LOG = LoggerFactory.getLogger(RSRpcServices.class); 270 271 /** RPC scheduler to use for the region server. */ 272 public static final String REGION_SERVER_RPC_SCHEDULER_FACTORY_CLASS = 273 "hbase.region.server.rpc.scheduler.factory.class"; 274 275 /** RPC scheduler to use for the master. */ 276 public static final String MASTER_RPC_SCHEDULER_FACTORY_CLASS = 277 "hbase.master.rpc.scheduler.factory.class"; 278 279 /** 280 * Minimum allowable time limit delta (in milliseconds) that can be enforced during scans. This 281 * configuration exists to prevent the scenario where a time limit is specified to be so 282 * restrictive that the time limit is reached immediately (before any cells are scanned). 283 */ 284 private static final String REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA = 285 "hbase.region.server.rpc.minimum.scan.time.limit.delta"; 286 /** 287 * Default value of {@link RSRpcServices#REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA} 288 */ 289 private static final long DEFAULT_REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA = 10; 290 291 /* 292 * Whether to reject rows with size > threshold defined by 293 * {@link RSRpcServices#BATCH_ROWS_THRESHOLD_NAME} 294 */ 295 private static final String REJECT_BATCH_ROWS_OVER_THRESHOLD = 296 "hbase.rpc.rows.size.threshold.reject"; 297 298 /* 299 * Default value of config {@link RSRpcServices#REJECT_BATCH_ROWS_OVER_THRESHOLD} 300 */ 301 private static final boolean DEFAULT_REJECT_BATCH_ROWS_OVER_THRESHOLD = false; 302 303 // Request counter. (Includes requests that are not serviced by regions.) 304 // Count only once for requests with multiple actions like multi/caching-scan/replayBatch 305 final LongAdder requestCount = new LongAdder(); 306 307 // Request counter for rpc get 308 final LongAdder rpcGetRequestCount = new LongAdder(); 309 310 // Request counter for rpc scan 311 final LongAdder rpcScanRequestCount = new LongAdder(); 312 313 // Request counter for scans that might end up in full scans 314 final LongAdder rpcFullScanRequestCount = new LongAdder(); 315 316 // Request counter for rpc multi 317 final LongAdder rpcMultiRequestCount = new LongAdder(); 318 319 // Request counter for rpc mutate 320 final LongAdder rpcMutateRequestCount = new LongAdder(); 321 322 // Server to handle client requests. 323 final RpcServerInterface rpcServer; 324 final InetSocketAddress isa; 325 326 protected final HRegionServer regionServer; 327 private final long maxScannerResultSize; 328 329 // The reference to the priority extraction function 330 private final PriorityFunction priority; 331 332 private ScannerIdGenerator scannerIdGenerator; 333 private final ConcurrentMap<String, RegionScannerHolder> scanners = new ConcurrentHashMap<>(); 334 // Hold the name of a closed scanner for a while. This is used to keep compatible for old clients 335 // which may send next or close request to a region scanner which has already been exhausted. The 336 // entries will be removed automatically after scannerLeaseTimeoutPeriod. 337 private final Cache<String, String> closedScanners; 338 /** 339 * The lease timeout period for client scanners (milliseconds). 340 */ 341 private final int scannerLeaseTimeoutPeriod; 342 343 /** 344 * The RPC timeout period (milliseconds) 345 */ 346 private final int rpcTimeout; 347 348 /** 349 * The minimum allowable delta to use for the scan limit 350 */ 351 private final long minimumScanTimeLimitDelta; 352 353 /** 354 * Row size threshold for multi requests above which a warning is logged 355 */ 356 private final int rowSizeWarnThreshold; 357 /* 358 * Whether we should reject requests with very high no of rows i.e. beyond threshold 359 * defined by rowSizeWarnThreshold 360 */ 361 private final boolean rejectRowsWithSizeOverThreshold; 362 363 final AtomicBoolean clearCompactionQueues = new AtomicBoolean(false); 364 365 private AccessChecker accessChecker; 366 private ZKPermissionWatcher zkPermissionWatcher; 367 368 /** 369 * Services launched in RSRpcServices. By default they are on but you can use the below 370 * booleans to selectively enable/disable either Admin or Client Service (Rare is the case 371 * where you would ever turn off one or the other). 372 */ 373 public static final String REGIONSERVER_ADMIN_SERVICE_CONFIG = 374 "hbase.regionserver.admin.executorService"; 375 public static final String REGIONSERVER_CLIENT_SERVICE_CONFIG = 376 "hbase.regionserver.client.executorService"; 377 378 /** 379 * An Rpc callback for closing a RegionScanner. 380 */ 381 private static final class RegionScannerCloseCallBack implements RpcCallback { 382 383 private final RegionScanner scanner; 384 385 public RegionScannerCloseCallBack(RegionScanner scanner) { 386 this.scanner = scanner; 387 } 388 389 @Override 390 public void run() throws IOException { 391 this.scanner.close(); 392 } 393 } 394 395 /** 396 * An Rpc callback for doing shipped() call on a RegionScanner. 397 */ 398 private class RegionScannerShippedCallBack implements RpcCallback { 399 400 private final String scannerName; 401 private final Shipper shipper; 402 private final Lease lease; 403 404 public RegionScannerShippedCallBack(String scannerName, Shipper shipper, Lease lease) { 405 this.scannerName = scannerName; 406 this.shipper = shipper; 407 this.lease = lease; 408 } 409 410 @Override 411 public void run() throws IOException { 412 this.shipper.shipped(); 413 // We're done. On way out re-add the above removed lease. The lease was temp removed for this 414 // Rpc call and we are at end of the call now. Time to add it back. 415 if (scanners.containsKey(scannerName)) { 416 if (lease != null) { 417 regionServer.getLeaseManager().addLease(lease); 418 } 419 } 420 } 421 } 422 423 /** 424 * An RpcCallBack that creates a list of scanners that needs to perform callBack operation on 425 * completion of multiGets. 426 */ 427 static class RegionScannersCloseCallBack implements RpcCallback { 428 private final List<RegionScanner> scanners = new ArrayList<>(); 429 430 public void addScanner(RegionScanner scanner) { 431 this.scanners.add(scanner); 432 } 433 434 @Override 435 public void run() { 436 for (RegionScanner scanner : scanners) { 437 try { 438 scanner.close(); 439 } catch (IOException e) { 440 LOG.error("Exception while closing the scanner " + scanner, e); 441 } 442 } 443 } 444 } 445 446 /** 447 * Holder class which holds the RegionScanner, nextCallSeq and RpcCallbacks together. 448 */ 449 private static final class RegionScannerHolder { 450 451 private final AtomicLong nextCallSeq = new AtomicLong(0); 452 private final String scannerName; 453 private final RegionScanner s; 454 private final HRegion r; 455 private final RpcCallback closeCallBack; 456 private final RpcCallback shippedCallback; 457 private byte[] rowOfLastPartialResult; 458 private boolean needCursor; 459 private boolean fullRegionScan; 460 461 public RegionScannerHolder(String scannerName, RegionScanner s, HRegion r, 462 RpcCallback closeCallBack, RpcCallback shippedCallback, boolean needCursor, 463 boolean fullRegionScan) { 464 this.scannerName = scannerName; 465 this.s = s; 466 this.r = r; 467 this.closeCallBack = closeCallBack; 468 this.shippedCallback = shippedCallback; 469 this.needCursor = needCursor; 470 this.fullRegionScan = fullRegionScan; 471 } 472 473 public long getNextCallSeq() { 474 return nextCallSeq.get(); 475 } 476 477 public boolean incNextCallSeq(long currentSeq) { 478 // Use CAS to prevent multiple scan request running on the same scanner. 479 return nextCallSeq.compareAndSet(currentSeq, currentSeq + 1); 480 } 481 } 482 483 /** 484 * Instantiated as a scanner lease. If the lease times out, the scanner is 485 * closed 486 */ 487 private class ScannerListener implements LeaseListener { 488 private final String scannerName; 489 490 ScannerListener(final String n) { 491 this.scannerName = n; 492 } 493 494 @Override 495 public void leaseExpired() { 496 RegionScannerHolder rsh = scanners.remove(this.scannerName); 497 if (rsh != null) { 498 RegionScanner s = rsh.s; 499 LOG.info("Scanner " + this.scannerName + " lease expired on region " 500 + s.getRegionInfo().getRegionNameAsString()); 501 HRegion region = null; 502 try { 503 region = regionServer.getRegion(s.getRegionInfo().getRegionName()); 504 if (region != null && region.getCoprocessorHost() != null) { 505 region.getCoprocessorHost().preScannerClose(s); 506 } 507 } catch (IOException e) { 508 LOG.error("Closing scanner for " + s.getRegionInfo().getRegionNameAsString(), e); 509 } finally { 510 try { 511 s.close(); 512 if (region != null && region.getCoprocessorHost() != null) { 513 region.getCoprocessorHost().postScannerClose(s); 514 } 515 } catch (IOException e) { 516 LOG.error("Closing scanner for " + s.getRegionInfo().getRegionNameAsString(), e); 517 } 518 } 519 } else { 520 LOG.warn("Scanner " + this.scannerName + " lease expired, but no related" + 521 " scanner found, hence no chance to close that related scanner!"); 522 } 523 } 524 } 525 526 private static ResultOrException getResultOrException(final ClientProtos.Result r, 527 final int index){ 528 return getResultOrException(ResponseConverter.buildActionResult(r), index); 529 } 530 531 private static ResultOrException getResultOrException(final Exception e, final int index) { 532 return getResultOrException(ResponseConverter.buildActionResult(e), index); 533 } 534 535 private static ResultOrException getResultOrException( 536 final ResultOrException.Builder builder, final int index) { 537 return builder.setIndex(index).build(); 538 } 539 540 /** 541 * Checks for the following pre-checks in order: 542 * <ol> 543 * <li>RegionServer is running</li> 544 * <li>If authorization is enabled, then RPC caller has ADMIN permissions</li> 545 * </ol> 546 * @param requestName name of rpc request. Used in reporting failures to provide context. 547 * @throws ServiceException If any of the above listed pre-check fails. 548 */ 549 private void rpcPreCheck(String requestName) throws ServiceException { 550 try { 551 checkOpen(); 552 requirePermission(requestName, Permission.Action.ADMIN); 553 } catch (IOException ioe) { 554 throw new ServiceException(ioe); 555 } 556 } 557 558 private boolean isClientCellBlockSupport(RpcCallContext context) { 559 return context != null && context.isClientCellBlockSupported(); 560 } 561 562 private void addResult(final MutateResponse.Builder builder, final Result result, 563 final HBaseRpcController rpcc, boolean clientCellBlockSupported) { 564 if (result == null) return; 565 if (clientCellBlockSupported) { 566 builder.setResult(ProtobufUtil.toResultNoData(result)); 567 rpcc.setCellScanner(result.cellScanner()); 568 } else { 569 ClientProtos.Result pbr = ProtobufUtil.toResult(result); 570 builder.setResult(pbr); 571 } 572 } 573 574 private void addResults(ScanResponse.Builder builder, List<Result> results, 575 HBaseRpcController controller, boolean isDefaultRegion, boolean clientCellBlockSupported) { 576 builder.setStale(!isDefaultRegion); 577 if (results.isEmpty()) { 578 return; 579 } 580 if (clientCellBlockSupported) { 581 for (Result res : results) { 582 builder.addCellsPerResult(res.size()); 583 builder.addPartialFlagPerResult(res.mayHaveMoreCellsInRow()); 584 } 585 controller.setCellScanner(CellUtil.createCellScanner(results)); 586 } else { 587 for (Result res : results) { 588 ClientProtos.Result pbr = ProtobufUtil.toResult(res); 589 builder.addResults(pbr); 590 } 591 } 592 } 593 594 private CheckAndMutateResult checkAndMutate(HRegion region, List<ClientProtos.Action> actions, 595 CellScanner cellScanner, Condition condition, ActivePolicyEnforcement spaceQuotaEnforcement) 596 throws IOException { 597 int countOfCompleteMutation = 0; 598 try { 599 if (!region.getRegionInfo().isMetaRegion()) { 600 regionServer.getMemStoreFlusher().reclaimMemStoreMemory(); 601 } 602 List<Mutation> mutations = new ArrayList<>(); 603 for (ClientProtos.Action action: actions) { 604 if (action.hasGet()) { 605 throw new DoNotRetryIOException("Atomic put and/or delete only, not a Get=" + 606 action.getGet()); 607 } 608 MutationType type = action.getMutation().getMutateType(); 609 switch (type) { 610 case PUT: 611 Put put = ProtobufUtil.toPut(action.getMutation(), cellScanner); 612 ++countOfCompleteMutation; 613 checkCellSizeLimit(region, put); 614 spaceQuotaEnforcement.getPolicyEnforcement(region).check(put); 615 mutations.add(put); 616 break; 617 case DELETE: 618 Delete del = ProtobufUtil.toDelete(action.getMutation(), cellScanner); 619 ++countOfCompleteMutation; 620 spaceQuotaEnforcement.getPolicyEnforcement(region).check(del); 621 mutations.add(del); 622 break; 623 case INCREMENT: 624 Increment increment = ProtobufUtil.toIncrement(action.getMutation(), cellScanner); 625 ++countOfCompleteMutation; 626 checkCellSizeLimit(region, increment); 627 spaceQuotaEnforcement.getPolicyEnforcement(region).check(increment); 628 mutations.add(increment); 629 break; 630 case APPEND: 631 Append append = ProtobufUtil.toAppend(action.getMutation(), cellScanner); 632 ++countOfCompleteMutation; 633 checkCellSizeLimit(region, append); 634 spaceQuotaEnforcement.getPolicyEnforcement(region).check(append); 635 mutations.add(append); 636 break; 637 default: 638 throw new DoNotRetryIOException("invalid mutation type : " + type); 639 } 640 } 641 642 if (mutations.size() == 0) { 643 return new CheckAndMutateResult(true, null); 644 } else { 645 CheckAndMutate checkAndMutate = ProtobufUtil.toCheckAndMutate(condition, mutations); 646 CheckAndMutateResult result = null; 647 if (region.getCoprocessorHost() != null) { 648 result = region.getCoprocessorHost().preCheckAndMutate(checkAndMutate); 649 } 650 if (result == null) { 651 result = region.checkAndMutate(checkAndMutate); 652 if (region.getCoprocessorHost() != null) { 653 result = region.getCoprocessorHost().postCheckAndMutate(checkAndMutate, result); 654 } 655 } 656 return result; 657 } 658 } finally { 659 // Currently, the checkAndMutate isn't supported by batch so it won't mess up the cell scanner 660 // even if the malformed cells are not skipped. 661 for (int i = countOfCompleteMutation; i < actions.size(); ++i) { 662 skipCellsForMutation(actions.get(i), cellScanner); 663 } 664 } 665 } 666 667 /** 668 * Execute an append mutation. 669 * 670 * @return result to return to client if default operation should be 671 * bypassed as indicated by RegionObserver, null otherwise 672 */ 673 private Result append(final HRegion region, final OperationQuota quota, 674 final MutationProto mutation, final CellScanner cellScanner, long nonceGroup, 675 ActivePolicyEnforcement spaceQuota) 676 throws IOException { 677 long before = EnvironmentEdgeManager.currentTime(); 678 Append append = ProtobufUtil.toAppend(mutation, cellScanner); 679 checkCellSizeLimit(region, append); 680 spaceQuota.getPolicyEnforcement(region).check(append); 681 quota.addMutation(append); 682 long nonce = mutation.hasNonce() ? mutation.getNonce() : HConstants.NO_NONCE; 683 Result r = region.append(append, nonceGroup, nonce); 684 if (regionServer.getMetrics() != null) { 685 regionServer.getMetrics().updateAppend(region.getTableDescriptor().getTableName(), 686 EnvironmentEdgeManager.currentTime() - before); 687 } 688 return r == null ? Result.EMPTY_RESULT : r; 689 } 690 691 /** 692 * Execute an increment mutation. 693 */ 694 private Result increment(final HRegion region, final OperationQuota quota, 695 final MutationProto mutation, final CellScanner cells, long nonceGroup, 696 ActivePolicyEnforcement spaceQuota) 697 throws IOException { 698 long before = EnvironmentEdgeManager.currentTime(); 699 Increment increment = ProtobufUtil.toIncrement(mutation, cells); 700 checkCellSizeLimit(region, increment); 701 spaceQuota.getPolicyEnforcement(region).check(increment); 702 quota.addMutation(increment); 703 long nonce = mutation.hasNonce() ? mutation.getNonce() : HConstants.NO_NONCE; 704 Result r = region.increment(increment, nonceGroup, nonce); 705 final MetricsRegionServer metricsRegionServer = regionServer.getMetrics(); 706 if (metricsRegionServer != null) { 707 metricsRegionServer.updateIncrement(region.getTableDescriptor().getTableName(), 708 EnvironmentEdgeManager.currentTime() - before); 709 } 710 return r == null ? Result.EMPTY_RESULT : r; 711 } 712 713 /** 714 * Run through the regionMutation <code>rm</code> and per Mutation, do the work, and then when 715 * done, add an instance of a {@link ResultOrException} that corresponds to each Mutation. 716 * @param cellsToReturn Could be null. May be allocated in this method. This is what this 717 * method returns as a 'result'. 718 * @param closeCallBack the callback to be used with multigets 719 * @param context the current RpcCallContext 720 * @return Return the <code>cellScanner</code> passed 721 */ 722 private List<CellScannable> doNonAtomicRegionMutation(final HRegion region, 723 final OperationQuota quota, final RegionAction actions, final CellScanner cellScanner, 724 final RegionActionResult.Builder builder, List<CellScannable> cellsToReturn, long nonceGroup, 725 final RegionScannersCloseCallBack closeCallBack, RpcCallContext context, 726 ActivePolicyEnforcement spaceQuotaEnforcement) { 727 // Gather up CONTIGUOUS Puts and Deletes in this mutations List. Idea is that rather than do 728 // one at a time, we instead pass them in batch. Be aware that the corresponding 729 // ResultOrException instance that matches each Put or Delete is then added down in the 730 // doNonAtomicBatchOp call. We should be staying aligned though the Put and Delete are 731 // deferred/batched 732 List<ClientProtos.Action> mutations = null; 733 long maxQuotaResultSize = Math.min(maxScannerResultSize, quota.getReadAvailable()); 734 IOException sizeIOE = null; 735 Object lastBlock = null; 736 ClientProtos.ResultOrException.Builder resultOrExceptionBuilder = ResultOrException.newBuilder(); 737 boolean hasResultOrException = false; 738 for (ClientProtos.Action action : actions.getActionList()) { 739 hasResultOrException = false; 740 resultOrExceptionBuilder.clear(); 741 try { 742 Result r = null; 743 744 if (context != null 745 && context.isRetryImmediatelySupported() 746 && (context.getResponseCellSize() > maxQuotaResultSize 747 || context.getResponseBlockSize() + context.getResponseExceptionSize() 748 > maxQuotaResultSize)) { 749 750 // We're storing the exception since the exception and reason string won't 751 // change after the response size limit is reached. 752 if (sizeIOE == null ) { 753 // We don't need the stack un-winding do don't throw the exception. 754 // Throwing will kill the JVM's JIT. 755 // 756 // Instead just create the exception and then store it. 757 sizeIOE = new MultiActionResultTooLarge("Max size exceeded" 758 + " CellSize: " + context.getResponseCellSize() 759 + " BlockSize: " + context.getResponseBlockSize()); 760 761 // Only report the exception once since there's only one request that 762 // caused the exception. Otherwise this number will dominate the exceptions count. 763 rpcServer.getMetrics().exception(sizeIOE); 764 } 765 766 // Now that there's an exception is known to be created 767 // use it for the response. 768 // 769 // This will create a copy in the builder. 770 NameBytesPair pair = ResponseConverter.buildException(sizeIOE); 771 resultOrExceptionBuilder.setException(pair); 772 context.incrementResponseExceptionSize(pair.getSerializedSize()); 773 resultOrExceptionBuilder.setIndex(action.getIndex()); 774 builder.addResultOrException(resultOrExceptionBuilder.build()); 775 skipCellsForMutation(action, cellScanner); 776 continue; 777 } 778 if (action.hasGet()) { 779 long before = EnvironmentEdgeManager.currentTime(); 780 ClientProtos.Get pbGet = action.getGet(); 781 // An asynchbase client, https://github.com/OpenTSDB/asynchbase, starts by trying to do 782 // a get closest before. Throwing the UnknownProtocolException signals it that it needs 783 // to switch and do hbase2 protocol (HBase servers do not tell clients what versions 784 // they are; its a problem for non-native clients like asynchbase. HBASE-20225. 785 if (pbGet.hasClosestRowBefore() && pbGet.getClosestRowBefore()) { 786 throw new UnknownProtocolException("Is this a pre-hbase-1.0.0 or asynchbase client? " + 787 "Client is invoking getClosestRowBefore removed in hbase-2.0.0 replaced by " + 788 "reverse Scan."); 789 } 790 try { 791 Get get = ProtobufUtil.toGet(pbGet); 792 if (context != null) { 793 r = get(get, (region), closeCallBack, context); 794 } else { 795 r = region.get(get); 796 } 797 } finally { 798 final MetricsRegionServer metricsRegionServer = regionServer.getMetrics(); 799 if (metricsRegionServer != null) { 800 metricsRegionServer.updateGet( 801 region.getTableDescriptor().getTableName(), 802 EnvironmentEdgeManager.currentTime() - before); 803 } 804 } 805 } else if (action.hasServiceCall()) { 806 hasResultOrException = true; 807 com.google.protobuf.Message result = 808 execServiceOnRegion(region, action.getServiceCall()); 809 ClientProtos.CoprocessorServiceResult.Builder serviceResultBuilder = 810 ClientProtos.CoprocessorServiceResult.newBuilder(); 811 resultOrExceptionBuilder.setServiceResult( 812 serviceResultBuilder.setValue( 813 serviceResultBuilder.getValueBuilder() 814 .setName(result.getClass().getName()) 815 // TODO: Copy!!! 816 .setValue(UnsafeByteOperations.unsafeWrap(result.toByteArray())))); 817 } else if (action.hasMutation()) { 818 MutationType type = action.getMutation().getMutateType(); 819 if (type != MutationType.PUT && type != MutationType.DELETE && mutations != null && 820 !mutations.isEmpty()) { 821 // Flush out any Puts or Deletes already collected. 822 doNonAtomicBatchOp(builder, region, quota, mutations, cellScanner, 823 spaceQuotaEnforcement); 824 mutations.clear(); 825 } 826 switch (type) { 827 case APPEND: 828 r = append(region, quota, action.getMutation(), cellScanner, nonceGroup, 829 spaceQuotaEnforcement); 830 break; 831 case INCREMENT: 832 r = increment(region, quota, action.getMutation(), cellScanner, nonceGroup, 833 spaceQuotaEnforcement); 834 break; 835 case PUT: 836 case DELETE: 837 // Collect the individual mutations and apply in a batch 838 if (mutations == null) { 839 mutations = new ArrayList<>(actions.getActionCount()); 840 } 841 mutations.add(action); 842 break; 843 default: 844 throw new DoNotRetryIOException("Unsupported mutate type: " + type.name()); 845 } 846 } else { 847 throw new HBaseIOException("Unexpected Action type"); 848 } 849 if (r != null) { 850 ClientProtos.Result pbResult = null; 851 if (isClientCellBlockSupport(context)) { 852 pbResult = ProtobufUtil.toResultNoData(r); 853 // Hard to guess the size here. Just make a rough guess. 854 if (cellsToReturn == null) { 855 cellsToReturn = new ArrayList<>(); 856 } 857 cellsToReturn.add(r); 858 } else { 859 pbResult = ProtobufUtil.toResult(r); 860 } 861 lastBlock = addSize(context, r, lastBlock); 862 hasResultOrException = true; 863 resultOrExceptionBuilder.setResult(pbResult); 864 } 865 // Could get to here and there was no result and no exception. Presumes we added 866 // a Put or Delete to the collecting Mutations List for adding later. In this 867 // case the corresponding ResultOrException instance for the Put or Delete will be added 868 // down in the doNonAtomicBatchOp method call rather than up here. 869 } catch (IOException ie) { 870 rpcServer.getMetrics().exception(ie); 871 hasResultOrException = true; 872 NameBytesPair pair = ResponseConverter.buildException(ie); 873 resultOrExceptionBuilder.setException(pair); 874 context.incrementResponseExceptionSize(pair.getSerializedSize()); 875 } 876 if (hasResultOrException) { 877 // Propagate index. 878 resultOrExceptionBuilder.setIndex(action.getIndex()); 879 builder.addResultOrException(resultOrExceptionBuilder.build()); 880 } 881 } 882 // Finish up any outstanding mutations 883 if (!CollectionUtils.isEmpty(mutations)) { 884 doNonAtomicBatchOp(builder, region, quota, mutations, cellScanner, spaceQuotaEnforcement); 885 } 886 return cellsToReturn; 887 } 888 889 private void checkCellSizeLimit(final HRegion r, final Mutation m) throws IOException { 890 if (r.maxCellSize > 0) { 891 CellScanner cells = m.cellScanner(); 892 while (cells.advance()) { 893 int size = PrivateCellUtil.estimatedSerializedSizeOf(cells.current()); 894 if (size > r.maxCellSize) { 895 String msg = "Cell with size " + size + " exceeds limit of " + r.maxCellSize + " bytes"; 896 LOG.debug(msg); 897 throw new DoNotRetryIOException(msg); 898 } 899 } 900 } 901 } 902 903 private void doAtomicBatchOp(final RegionActionResult.Builder builder, final HRegion region, 904 final OperationQuota quota, final List<ClientProtos.Action> mutations, 905 final CellScanner cells, ActivePolicyEnforcement spaceQuotaEnforcement) 906 throws IOException { 907 // Just throw the exception. The exception will be caught and then added to region-level 908 // exception for RegionAction. Leaving the null to action result is ok since the null 909 // result is viewed as failure by hbase client. And the region-lever exception will be used 910 // to replaced the null result. see AsyncRequestFutureImpl#receiveMultiAction and 911 // AsyncBatchRpcRetryingCaller#onComplete for more details. 912 doBatchOp(builder, region, quota, mutations, cells, spaceQuotaEnforcement, true); 913 } 914 915 private void doNonAtomicBatchOp(final RegionActionResult.Builder builder, final HRegion region, 916 final OperationQuota quota, final List<ClientProtos.Action> mutations, 917 final CellScanner cells, ActivePolicyEnforcement spaceQuotaEnforcement) { 918 try { 919 doBatchOp(builder, region, quota, mutations, cells, spaceQuotaEnforcement, false); 920 } catch (IOException e) { 921 // Set the exception for each action. The mutations in same RegionAction are group to 922 // different batch and then be processed individually. Hence, we don't set the region-level 923 // exception here for whole RegionAction. 924 for (Action mutation : mutations) { 925 builder.addResultOrException(getResultOrException(e, mutation.getIndex())); 926 } 927 } 928 } 929 930 /** 931 * Execute a list of mutations. 932 * 933 * @param builder 934 * @param region 935 * @param mutations 936 */ 937 private void doBatchOp(final RegionActionResult.Builder builder, final HRegion region, 938 final OperationQuota quota, final List<ClientProtos.Action> mutations, 939 final CellScanner cells, ActivePolicyEnforcement spaceQuotaEnforcement, boolean atomic) 940 throws IOException { 941 Mutation[] mArray = new Mutation[mutations.size()]; 942 long before = EnvironmentEdgeManager.currentTime(); 943 boolean batchContainsPuts = false, batchContainsDelete = false; 944 try { 945 /** HBASE-17924 946 * mutationActionMap is a map to map the relation between mutations and actions 947 * since mutation array may have been reoredered.In order to return the right 948 * result or exception to the corresponding actions, We need to know which action 949 * is the mutation belong to. We can't sort ClientProtos.Action array, since they 950 * are bonded to cellscanners. 951 */ 952 Map<Mutation, ClientProtos.Action> mutationActionMap = new HashMap<>(); 953 int i = 0; 954 for (ClientProtos.Action action: mutations) { 955 if (action.hasGet()) { 956 throw new DoNotRetryIOException("Atomic put and/or delete only, not a Get=" + 957 action.getGet()); 958 } 959 MutationProto m = action.getMutation(); 960 Mutation mutation; 961 switch (m.getMutateType()) { 962 case PUT: 963 mutation = ProtobufUtil.toPut(m, cells); 964 batchContainsPuts = true; 965 break; 966 967 case DELETE: 968 mutation = ProtobufUtil.toDelete(m, cells); 969 batchContainsDelete = true; 970 break; 971 972 case INCREMENT: 973 mutation = ProtobufUtil.toIncrement(m, cells); 974 break; 975 976 case APPEND: 977 mutation = ProtobufUtil.toAppend(m, cells); 978 break; 979 980 default: 981 throw new DoNotRetryIOException("Invalid mutation type : " + m.getMutateType()); 982 } 983 mutationActionMap.put(mutation, action); 984 mArray[i++] = mutation; 985 checkCellSizeLimit(region, mutation); 986 // Check if a space quota disallows this mutation 987 spaceQuotaEnforcement.getPolicyEnforcement(region).check(mutation); 988 quota.addMutation(mutation); 989 } 990 991 if (!region.getRegionInfo().isMetaRegion()) { 992 regionServer.getMemStoreFlusher().reclaimMemStoreMemory(); 993 } 994 995 // HBASE-17924 996 // Sort to improve lock efficiency for non-atomic batch of operations. If atomic 997 // order is preserved as its expected from the client 998 if (!atomic) { 999 Arrays.sort(mArray, (v1, v2) -> Row.COMPARATOR.compare(v1, v2)); 1000 } 1001 1002 OperationStatus[] codes = region.batchMutate(mArray, atomic, HConstants.NO_NONCE, 1003 HConstants.NO_NONCE); 1004 1005 // When atomic is true, it indicates that the mutateRow API or the batch API with 1006 // RowMutations is called. In this case, we need to merge the results of the 1007 // Increment/Append operations if the mutations include those operations, and set the merged 1008 // result to the first element of the ResultOrException list 1009 if (atomic) { 1010 List<ResultOrException> resultOrExceptions = new ArrayList<>(); 1011 List<Result> results = new ArrayList<>(); 1012 for (i = 0; i < codes.length; i++) { 1013 if (codes[i].getResult() != null) { 1014 results.add(codes[i].getResult()); 1015 } 1016 if (i != 0) { 1017 resultOrExceptions.add(getResultOrException( 1018 ClientProtos.Result.getDefaultInstance(), i)); 1019 } 1020 } 1021 1022 if (results.isEmpty()) { 1023 builder.addResultOrException(getResultOrException( 1024 ClientProtos.Result.getDefaultInstance(), 0)); 1025 } else { 1026 // Merge the results of the Increment/Append operations 1027 List<Cell> cellList = new ArrayList<>(); 1028 for (Result result : results) { 1029 if (result.rawCells() != null) { 1030 cellList.addAll(Arrays.asList(result.rawCells())); 1031 } 1032 } 1033 Result result = Result.create(cellList); 1034 1035 // Set the merged result of the Increment/Append operations to the first element of the 1036 // ResultOrException list 1037 builder.addResultOrException(getResultOrException(ProtobufUtil.toResult(result), 0)); 1038 } 1039 1040 builder.addAllResultOrException(resultOrExceptions); 1041 return; 1042 } 1043 1044 for (i = 0; i < codes.length; i++) { 1045 Mutation currentMutation = mArray[i]; 1046 ClientProtos.Action currentAction = mutationActionMap.get(currentMutation); 1047 int index = currentAction.hasIndex() ? currentAction.getIndex() : i; 1048 Exception e; 1049 switch (codes[i].getOperationStatusCode()) { 1050 case BAD_FAMILY: 1051 e = new NoSuchColumnFamilyException(codes[i].getExceptionMsg()); 1052 builder.addResultOrException(getResultOrException(e, index)); 1053 break; 1054 1055 case SANITY_CHECK_FAILURE: 1056 e = new FailedSanityCheckException(codes[i].getExceptionMsg()); 1057 builder.addResultOrException(getResultOrException(e, index)); 1058 break; 1059 1060 default: 1061 e = new DoNotRetryIOException(codes[i].getExceptionMsg()); 1062 builder.addResultOrException(getResultOrException(e, index)); 1063 break; 1064 1065 case SUCCESS: 1066 builder.addResultOrException(getResultOrException( 1067 ClientProtos.Result.getDefaultInstance(), index)); 1068 break; 1069 1070 case STORE_TOO_BUSY: 1071 e = new RegionTooBusyException(codes[i].getExceptionMsg()); 1072 builder.addResultOrException(getResultOrException(e, index)); 1073 break; 1074 } 1075 } 1076 } finally { 1077 int processedMutationIndex = 0; 1078 for (Action mutation : mutations) { 1079 // The non-null mArray[i] means the cell scanner has been read. 1080 if (mArray[processedMutationIndex++] == null) { 1081 skipCellsForMutation(mutation, cells); 1082 } 1083 } 1084 updateMutationMetrics(region, before, batchContainsPuts, batchContainsDelete); 1085 } 1086 } 1087 1088 private void updateMutationMetrics(HRegion region, long starttime, boolean batchContainsPuts, 1089 boolean batchContainsDelete) { 1090 final MetricsRegionServer metricsRegionServer = regionServer.getMetrics(); 1091 if (metricsRegionServer != null) { 1092 long after = EnvironmentEdgeManager.currentTime(); 1093 if (batchContainsPuts) { 1094 metricsRegionServer.updatePutBatch( 1095 region.getTableDescriptor().getTableName(), after - starttime); 1096 } 1097 if (batchContainsDelete) { 1098 metricsRegionServer.updateDeleteBatch( 1099 region.getTableDescriptor().getTableName(), after - starttime); 1100 } 1101 } 1102 } 1103 1104 /** 1105 * Execute a list of Put/Delete mutations. The function returns OperationStatus instead of 1106 * constructing MultiResponse to save a possible loop if caller doesn't need MultiResponse. 1107 * @param region 1108 * @param mutations 1109 * @param replaySeqId 1110 * @return an array of OperationStatus which internally contains the OperationStatusCode and the 1111 * exceptionMessage if any 1112 * @throws IOException 1113 */ 1114 private OperationStatus [] doReplayBatchOp(final HRegion region, 1115 final List<MutationReplay> mutations, long replaySeqId) throws IOException { 1116 long before = EnvironmentEdgeManager.currentTime(); 1117 boolean batchContainsPuts = false, batchContainsDelete = false; 1118 try { 1119 for (Iterator<MutationReplay> it = mutations.iterator(); it.hasNext();) { 1120 MutationReplay m = it.next(); 1121 1122 if (m.getType() == MutationType.PUT) { 1123 batchContainsPuts = true; 1124 } else { 1125 batchContainsDelete = true; 1126 } 1127 1128 NavigableMap<byte[], List<Cell>> map = m.mutation.getFamilyCellMap(); 1129 List<Cell> metaCells = map.get(WALEdit.METAFAMILY); 1130 if (metaCells != null && !metaCells.isEmpty()) { 1131 for (Cell metaCell : metaCells) { 1132 CompactionDescriptor compactionDesc = WALEdit.getCompaction(metaCell); 1133 boolean isDefaultReplica = RegionReplicaUtil.isDefaultReplica(region.getRegionInfo()); 1134 HRegion hRegion = region; 1135 if (compactionDesc != null) { 1136 // replay the compaction. Remove the files from stores only if we are the primary 1137 // region replica (thus own the files) 1138 hRegion.replayWALCompactionMarker(compactionDesc, !isDefaultReplica, isDefaultReplica, 1139 replaySeqId); 1140 continue; 1141 } 1142 FlushDescriptor flushDesc = WALEdit.getFlushDescriptor(metaCell); 1143 if (flushDesc != null && !isDefaultReplica) { 1144 hRegion.replayWALFlushMarker(flushDesc, replaySeqId); 1145 continue; 1146 } 1147 RegionEventDescriptor regionEvent = WALEdit.getRegionEventDescriptor(metaCell); 1148 if (regionEvent != null && !isDefaultReplica) { 1149 hRegion.replayWALRegionEventMarker(regionEvent); 1150 continue; 1151 } 1152 BulkLoadDescriptor bulkLoadEvent = WALEdit.getBulkLoadDescriptor(metaCell); 1153 if (bulkLoadEvent != null) { 1154 hRegion.replayWALBulkLoadEventMarker(bulkLoadEvent); 1155 continue; 1156 } 1157 } 1158 it.remove(); 1159 } 1160 } 1161 requestCount.increment(); 1162 if (!region.getRegionInfo().isMetaRegion()) { 1163 regionServer.getMemStoreFlusher().reclaimMemStoreMemory(); 1164 } 1165 return region.batchReplay(mutations.toArray( 1166 new MutationReplay[mutations.size()]), replaySeqId); 1167 } finally { 1168 updateMutationMetrics(region, before, batchContainsPuts, batchContainsDelete); 1169 } 1170 } 1171 1172 private void closeAllScanners() { 1173 // Close any outstanding scanners. Means they'll get an UnknownScanner 1174 // exception next time they come in. 1175 for (Map.Entry<String, RegionScannerHolder> e : scanners.entrySet()) { 1176 try { 1177 e.getValue().s.close(); 1178 } catch (IOException ioe) { 1179 LOG.warn("Closing scanner " + e.getKey(), ioe); 1180 } 1181 } 1182 } 1183 1184 // Directly invoked only for testing 1185 public RSRpcServices(final HRegionServer rs) throws IOException { 1186 final Configuration conf = rs.getConfiguration(); 1187 regionServer = rs; 1188 rowSizeWarnThreshold = conf.getInt( 1189 HConstants.BATCH_ROWS_THRESHOLD_NAME, HConstants.BATCH_ROWS_THRESHOLD_DEFAULT); 1190 rejectRowsWithSizeOverThreshold = 1191 conf.getBoolean(REJECT_BATCH_ROWS_OVER_THRESHOLD, DEFAULT_REJECT_BATCH_ROWS_OVER_THRESHOLD); 1192 1193 final RpcSchedulerFactory rpcSchedulerFactory; 1194 try { 1195 rpcSchedulerFactory = getRpcSchedulerFactoryClass().asSubclass(RpcSchedulerFactory.class) 1196 .getDeclaredConstructor().newInstance(); 1197 } catch (NoSuchMethodException | InvocationTargetException | 1198 InstantiationException | IllegalAccessException e) { 1199 throw new IllegalArgumentException(e); 1200 } 1201 // Server to handle client requests. 1202 final InetSocketAddress initialIsa; 1203 final InetSocketAddress bindAddress; 1204 if(this instanceof MasterRpcServices) { 1205 String hostname = DNS.getHostname(conf, DNS.ServerType.MASTER); 1206 int port = conf.getInt(HConstants.MASTER_PORT, HConstants.DEFAULT_MASTER_PORT); 1207 // Creation of a HSA will force a resolve. 1208 initialIsa = new InetSocketAddress(hostname, port); 1209 bindAddress = new InetSocketAddress(conf.get("hbase.master.ipc.address", hostname), port); 1210 } else { 1211 String hostname = DNS.getHostname(conf, DNS.ServerType.REGIONSERVER); 1212 int port = conf.getInt(HConstants.REGIONSERVER_PORT, 1213 HConstants.DEFAULT_REGIONSERVER_PORT); 1214 // Creation of a HSA will force a resolve. 1215 initialIsa = new InetSocketAddress(hostname, port); 1216 bindAddress = 1217 new InetSocketAddress(conf.get("hbase.regionserver.ipc.address", hostname), port); 1218 } 1219 if (initialIsa.getAddress() == null) { 1220 throw new IllegalArgumentException("Failed resolve of " + initialIsa); 1221 } 1222 priority = createPriority(); 1223 // Using Address means we don't get the IP too. Shorten it more even to just the host name 1224 // w/o the domain. 1225 final String name = rs.getProcessName() + "/" + 1226 Address.fromParts(initialIsa.getHostName(), initialIsa.getPort()).toStringWithoutDomain(); 1227 // Set how many times to retry talking to another server over Connection. 1228 ConnectionUtils.setServerSideHConnectionRetriesConfig(conf, name, LOG); 1229 rpcServer = createRpcServer(rs, rpcSchedulerFactory, bindAddress, name); 1230 rpcServer.setRsRpcServices(this); 1231 if (!(rs instanceof HMaster)) { 1232 rpcServer.setNamedQueueRecorder(rs.getNamedQueueRecorder()); 1233 } 1234 scannerLeaseTimeoutPeriod = conf.getInt( 1235 HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, 1236 HConstants.DEFAULT_HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD); 1237 maxScannerResultSize = conf.getLong( 1238 HConstants.HBASE_SERVER_SCANNER_MAX_RESULT_SIZE_KEY, 1239 HConstants.DEFAULT_HBASE_SERVER_SCANNER_MAX_RESULT_SIZE); 1240 rpcTimeout = conf.getInt( 1241 HConstants.HBASE_RPC_TIMEOUT_KEY, 1242 HConstants.DEFAULT_HBASE_RPC_TIMEOUT); 1243 minimumScanTimeLimitDelta = conf.getLong( 1244 REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA, 1245 DEFAULT_REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA); 1246 1247 final InetSocketAddress address = rpcServer.getListenerAddress(); 1248 if (address == null) { 1249 throw new IOException("Listener channel is closed"); 1250 } 1251 // Set our address, however we need the final port that was given to rpcServer 1252 isa = new InetSocketAddress(initialIsa.getHostName(), address.getPort()); 1253 rpcServer.setErrorHandler(this); 1254 rs.setName(name); 1255 1256 closedScanners = CacheBuilder.newBuilder() 1257 .expireAfterAccess(scannerLeaseTimeoutPeriod, TimeUnit.MILLISECONDS).build(); 1258 } 1259 1260 protected RpcServerInterface createRpcServer( 1261 final Server server, 1262 final RpcSchedulerFactory rpcSchedulerFactory, 1263 final InetSocketAddress bindAddress, 1264 final String name 1265 ) throws IOException { 1266 final Configuration conf = server.getConfiguration(); 1267 boolean reservoirEnabled = conf.getBoolean(ByteBuffAllocator.ALLOCATOR_POOL_ENABLED_KEY, true); 1268 try { 1269 return RpcServerFactory.createRpcServer(server, name, getServices(), 1270 bindAddress, // use final bindAddress for this server. 1271 conf, rpcSchedulerFactory.create(conf, this, server), reservoirEnabled); 1272 } catch (BindException be) { 1273 throw new IOException(be.getMessage() + ". To switch ports use the '" 1274 + HConstants.REGIONSERVER_PORT + "' configuration property.", 1275 be.getCause() != null ? be.getCause() : be); 1276 } 1277 } 1278 1279 protected Class<?> getRpcSchedulerFactoryClass() { 1280 final Configuration conf = regionServer.getConfiguration(); 1281 return conf.getClass(REGION_SERVER_RPC_SCHEDULER_FACTORY_CLASS, 1282 SimpleRpcSchedulerFactory.class); 1283 } 1284 1285 @Override 1286 public void onConfigurationChange(Configuration newConf) { 1287 if (rpcServer instanceof ConfigurationObserver) { 1288 ((ConfigurationObserver)rpcServer).onConfigurationChange(newConf); 1289 } 1290 } 1291 1292 protected PriorityFunction createPriority() { 1293 return new AnnotationReadingPriorityFunction(this); 1294 } 1295 1296 protected void requirePermission(String request, Permission.Action perm) throws IOException { 1297 if (accessChecker != null) { 1298 accessChecker.requirePermission(RpcServer.getRequestUser().orElse(null), request, null, perm); 1299 } 1300 } 1301 1302 public int getScannersCount() { 1303 return scanners.size(); 1304 } 1305 1306 public 1307 RegionScanner getScanner(long scannerId) { 1308 String scannerIdString = Long.toString(scannerId); 1309 RegionScannerHolder scannerHolder = scanners.get(scannerIdString); 1310 if (scannerHolder != null) { 1311 return scannerHolder.s; 1312 } 1313 return null; 1314 } 1315 1316 public String getScanDetailsWithId(long scannerId) { 1317 RegionScanner scanner = getScanner(scannerId); 1318 if (scanner == null) { 1319 return null; 1320 } 1321 StringBuilder builder = new StringBuilder(); 1322 builder.append("table: ").append(scanner.getRegionInfo().getTable().getNameAsString()); 1323 builder.append(" region: ").append(scanner.getRegionInfo().getRegionNameAsString()); 1324 return builder.toString(); 1325 } 1326 1327 public String getScanDetailsWithRequest(ScanRequest request) { 1328 try { 1329 if (!request.hasRegion()) { 1330 return null; 1331 } 1332 Region region = getRegion(request.getRegion()); 1333 StringBuilder builder = new StringBuilder(); 1334 builder.append("table: ").append(region.getRegionInfo().getTable().getNameAsString()); 1335 builder.append(" region: ").append(region.getRegionInfo().getRegionNameAsString()); 1336 return builder.toString(); 1337 } catch (IOException ignored) { 1338 return null; 1339 } 1340 } 1341 1342 /** 1343 * Get the vtime associated with the scanner. 1344 * Currently the vtime is the number of "next" calls. 1345 */ 1346 long getScannerVirtualTime(long scannerId) { 1347 String scannerIdString = Long.toString(scannerId); 1348 RegionScannerHolder scannerHolder = scanners.get(scannerIdString); 1349 if (scannerHolder != null) { 1350 return scannerHolder.getNextCallSeq(); 1351 } 1352 return 0L; 1353 } 1354 1355 /** 1356 * Method to account for the size of retained cells and retained data blocks. 1357 * @param context rpc call context 1358 * @param r result to add size. 1359 * @param lastBlock last block to check whether we need to add the block size in context. 1360 * @return an object that represents the last referenced block from this response. 1361 */ 1362 Object addSize(RpcCallContext context, Result r, Object lastBlock) { 1363 if (context != null && r != null && !r.isEmpty()) { 1364 for (Cell c : r.rawCells()) { 1365 context.incrementResponseCellSize(PrivateCellUtil.estimatedSerializedSizeOf(c)); 1366 1367 // Since byte buffers can point all kinds of crazy places it's harder to keep track 1368 // of which blocks are kept alive by what byte buffer. 1369 // So we make a guess. 1370 if (c instanceof ByteBufferExtendedCell) { 1371 ByteBufferExtendedCell bbCell = (ByteBufferExtendedCell) c; 1372 ByteBuffer bb = bbCell.getValueByteBuffer(); 1373 if (bb != lastBlock) { 1374 context.incrementResponseBlockSize(bb.capacity()); 1375 lastBlock = bb; 1376 } 1377 } else { 1378 // We're using the last block being the same as the current block as 1379 // a proxy for pointing to a new block. This won't be exact. 1380 // If there are multiple gets that bounce back and forth 1381 // Then it's possible that this will over count the size of 1382 // referenced blocks. However it's better to over count and 1383 // use two rpcs than to OOME the regionserver. 1384 byte[] valueArray = c.getValueArray(); 1385 if (valueArray != lastBlock) { 1386 context.incrementResponseBlockSize(valueArray.length); 1387 lastBlock = valueArray; 1388 } 1389 } 1390 1391 } 1392 } 1393 return lastBlock; 1394 } 1395 1396 private RegionScannerHolder addScanner(String scannerName, RegionScanner s, Shipper shipper, 1397 HRegion r, boolean needCursor, boolean fullRegionScan) throws LeaseStillHeldException { 1398 Lease lease = regionServer.getLeaseManager().createLease( 1399 scannerName, this.scannerLeaseTimeoutPeriod, new ScannerListener(scannerName)); 1400 RpcCallback shippedCallback = new RegionScannerShippedCallBack(scannerName, shipper, lease); 1401 RpcCallback closeCallback; 1402 if (s instanceof RpcCallback) { 1403 closeCallback = (RpcCallback) s; 1404 } else { 1405 closeCallback = new RegionScannerCloseCallBack(s); 1406 } 1407 1408 RegionScannerHolder rsh = 1409 new RegionScannerHolder(scannerName, s, r, closeCallback, shippedCallback, 1410 needCursor, fullRegionScan); 1411 RegionScannerHolder existing = scanners.putIfAbsent(scannerName, rsh); 1412 assert existing == null : "scannerId must be unique within regionserver's whole lifecycle! " + 1413 scannerName; 1414 return rsh; 1415 } 1416 1417 private boolean isFullRegionScan(Scan scan, HRegion region) { 1418 // If the scan start row equals or less than the start key of the region 1419 // and stop row greater than equals end key (if stop row present) 1420 // or if the stop row is empty 1421 // account this as a full region scan 1422 if (Bytes.compareTo(scan.getStartRow(), region.getRegionInfo().getStartKey()) <= 0 1423 && (Bytes.compareTo(scan.getStopRow(), region.getRegionInfo().getEndKey()) >= 0 && 1424 !Bytes.equals(region.getRegionInfo().getEndKey(), HConstants.EMPTY_END_ROW) 1425 || Bytes.equals(scan.getStopRow(), HConstants.EMPTY_END_ROW))) { 1426 return true; 1427 } 1428 return false; 1429 } 1430 1431 /** 1432 * Find the HRegion based on a region specifier 1433 * 1434 * @param regionSpecifier the region specifier 1435 * @return the corresponding region 1436 * @throws IOException if the specifier is not null, 1437 * but failed to find the region 1438 */ 1439 public HRegion getRegion( 1440 final RegionSpecifier regionSpecifier) throws IOException { 1441 return regionServer.getRegion(regionSpecifier.getValue().toByteArray()); 1442 } 1443 1444 /** 1445 * Find the List of HRegions based on a list of region specifiers 1446 * 1447 * @param regionSpecifiers the list of region specifiers 1448 * @return the corresponding list of regions 1449 * @throws IOException if any of the specifiers is not null, 1450 * but failed to find the region 1451 */ 1452 private List<HRegion> getRegions(final List<RegionSpecifier> regionSpecifiers, 1453 final CacheEvictionStatsBuilder stats) { 1454 List<HRegion> regions = Lists.newArrayListWithCapacity(regionSpecifiers.size()); 1455 for (RegionSpecifier regionSpecifier: regionSpecifiers) { 1456 try { 1457 regions.add(regionServer.getRegion(regionSpecifier.getValue().toByteArray())); 1458 } catch (NotServingRegionException e) { 1459 stats.addException(regionSpecifier.getValue().toByteArray(), e); 1460 } 1461 } 1462 return regions; 1463 } 1464 1465 public PriorityFunction getPriority() { 1466 return priority; 1467 } 1468 1469 public Configuration getConfiguration() { 1470 return regionServer.getConfiguration(); 1471 } 1472 1473 private RegionServerRpcQuotaManager getRpcQuotaManager() { 1474 return regionServer.getRegionServerRpcQuotaManager(); 1475 } 1476 1477 private RegionServerSpaceQuotaManager getSpaceQuotaManager() { 1478 return regionServer.getRegionServerSpaceQuotaManager(); 1479 } 1480 1481 void start(ZKWatcher zkWatcher) { 1482 if (AccessChecker.isAuthorizationSupported(getConfiguration())) { 1483 accessChecker = new AccessChecker(getConfiguration()); 1484 } else { 1485 accessChecker = new NoopAccessChecker(getConfiguration()); 1486 } 1487 zkPermissionWatcher = 1488 new ZKPermissionWatcher(zkWatcher, accessChecker.getAuthManager(), getConfiguration()); 1489 try { 1490 zkPermissionWatcher.start(); 1491 } catch (KeeperException e) { 1492 LOG.error("ZooKeeper permission watcher initialization failed", e); 1493 } 1494 this.scannerIdGenerator = new ScannerIdGenerator(this.regionServer.serverName); 1495 rpcServer.start(); 1496 } 1497 1498 void stop() { 1499 if (zkPermissionWatcher != null) { 1500 zkPermissionWatcher.close(); 1501 } 1502 closeAllScanners(); 1503 rpcServer.stop(); 1504 } 1505 1506 /** 1507 * Called to verify that this server is up and running. 1508 */ 1509 // TODO : Rename this and HMaster#checkInitialized to isRunning() (or a better name). 1510 protected void checkOpen() throws IOException { 1511 if (regionServer.isAborted()) { 1512 throw new RegionServerAbortedException("Server " + regionServer.serverName + " aborting"); 1513 } 1514 if (regionServer.isStopped()) { 1515 throw new RegionServerStoppedException("Server " + regionServer.serverName + " stopping"); 1516 } 1517 if (!regionServer.isDataFileSystemOk()) { 1518 throw new RegionServerStoppedException("File system not available"); 1519 } 1520 if (!regionServer.isOnline()) { 1521 throw new ServerNotRunningYetException("Server " + regionServer.serverName 1522 + " is not running yet"); 1523 } 1524 } 1525 1526 /** 1527 * By default, put up an Admin and a Client Service. 1528 * Set booleans <code>hbase.regionserver.admin.executorService</code> and 1529 * <code>hbase.regionserver.client.executorService</code> if you want to enable/disable services. 1530 * Default is that both are enabled. 1531 * @return immutable list of blocking services and the security info classes that this server 1532 * supports 1533 */ 1534 protected List<BlockingServiceAndInterface> getServices() { 1535 boolean admin = 1536 getConfiguration().getBoolean(REGIONSERVER_ADMIN_SERVICE_CONFIG, true); 1537 boolean client = 1538 getConfiguration().getBoolean(REGIONSERVER_CLIENT_SERVICE_CONFIG, true); 1539 List<BlockingServiceAndInterface> bssi = new ArrayList<>(); 1540 if (client) { 1541 bssi.add(new BlockingServiceAndInterface( 1542 ClientService.newReflectiveBlockingService(this), 1543 ClientService.BlockingInterface.class)); 1544 } 1545 if (admin) { 1546 bssi.add(new BlockingServiceAndInterface( 1547 AdminService.newReflectiveBlockingService(this), 1548 AdminService.BlockingInterface.class)); 1549 } 1550 return new org.apache.hbase.thirdparty.com.google.common.collect. 1551 ImmutableList.Builder<BlockingServiceAndInterface>().addAll(bssi).build(); 1552 } 1553 1554 public InetSocketAddress getSocketAddress() { 1555 return isa; 1556 } 1557 1558 @Override 1559 public int getPriority(RequestHeader header, Message param, User user) { 1560 return priority.getPriority(header, param, user); 1561 } 1562 1563 @Override 1564 public long getDeadline(RequestHeader header, Message param) { 1565 return priority.getDeadline(header, param); 1566 } 1567 1568 /* 1569 * Check if an OOME and, if so, abort immediately to avoid creating more objects. 1570 * 1571 * @param e 1572 * 1573 * @return True if we OOME'd and are aborting. 1574 */ 1575 @Override 1576 public boolean checkOOME(final Throwable e) { 1577 return exitIfOOME(e); 1578 } 1579 1580 public static boolean exitIfOOME(final Throwable e ){ 1581 boolean stop = false; 1582 try { 1583 if (e instanceof OutOfMemoryError 1584 || (e.getCause() != null && e.getCause() instanceof OutOfMemoryError) 1585 || (e.getMessage() != null && e.getMessage().contains( 1586 "java.lang.OutOfMemoryError"))) { 1587 stop = true; 1588 LOG.error(HBaseMarkers.FATAL, "Run out of memory; " 1589 + RSRpcServices.class.getSimpleName() + " will abort itself immediately", 1590 e); 1591 } 1592 } finally { 1593 if (stop) { 1594 Runtime.getRuntime().halt(1); 1595 } 1596 } 1597 return stop; 1598 } 1599 1600 /** 1601 * Close a region on the region server. 1602 * 1603 * @param controller the RPC controller 1604 * @param request the request 1605 * @throws ServiceException 1606 */ 1607 @Override 1608 @QosPriority(priority=HConstants.ADMIN_QOS) 1609 public CloseRegionResponse closeRegion(final RpcController controller, 1610 final CloseRegionRequest request) throws ServiceException { 1611 final ServerName sn = (request.hasDestinationServer() ? 1612 ProtobufUtil.toServerName(request.getDestinationServer()) : null); 1613 1614 try { 1615 checkOpen(); 1616 throwOnWrongStartCode(request); 1617 final String encodedRegionName = ProtobufUtil.getRegionEncodedName(request.getRegion()); 1618 1619 requestCount.increment(); 1620 if (sn == null) { 1621 LOG.info("Close " + encodedRegionName + " without moving"); 1622 } else { 1623 LOG.info("Close " + encodedRegionName + ", moving to " + sn); 1624 } 1625 boolean closed = regionServer.closeRegion(encodedRegionName, false, sn); 1626 CloseRegionResponse.Builder builder = CloseRegionResponse.newBuilder().setClosed(closed); 1627 return builder.build(); 1628 } catch (IOException ie) { 1629 throw new ServiceException(ie); 1630 } 1631 } 1632 1633 /** 1634 * Compact a region on the region server. 1635 * 1636 * @param controller the RPC controller 1637 * @param request the request 1638 * @throws ServiceException 1639 */ 1640 @Override 1641 @QosPriority(priority = HConstants.ADMIN_QOS) 1642 public CompactRegionResponse compactRegion(final RpcController controller, 1643 final CompactRegionRequest request) throws ServiceException { 1644 try { 1645 checkOpen(); 1646 requestCount.increment(); 1647 HRegion region = getRegion(request.getRegion()); 1648 // Quota support is enabled, the requesting user is not system/super user 1649 // and a quota policy is enforced that disables compactions. 1650 if (QuotaUtil.isQuotaEnabled(getConfiguration()) && 1651 !Superusers.isSuperUser(RpcServer.getRequestUser().orElse(null)) && 1652 this.regionServer.getRegionServerSpaceQuotaManager() 1653 .areCompactionsDisabled(region.getTableDescriptor().getTableName())) { 1654 throw new DoNotRetryIOException( 1655 "Compactions on this region are " + "disabled due to a space quota violation."); 1656 } 1657 region.startRegionOperation(Operation.COMPACT_REGION); 1658 LOG.info("Compacting " + region.getRegionInfo().getRegionNameAsString()); 1659 boolean major = request.hasMajor() && request.getMajor(); 1660 if (request.hasFamily()) { 1661 byte[] family = request.getFamily().toByteArray(); 1662 String log = "User-triggered " + (major ? "major " : "") + "compaction for region " + 1663 region.getRegionInfo().getRegionNameAsString() + " and family " + 1664 Bytes.toString(family); 1665 LOG.trace(log); 1666 region.requestCompaction(family, log, Store.PRIORITY_USER, major, 1667 CompactionLifeCycleTracker.DUMMY); 1668 } else { 1669 String log = "User-triggered " + (major ? "major " : "") + "compaction for region " + 1670 region.getRegionInfo().getRegionNameAsString(); 1671 LOG.trace(log); 1672 region.requestCompaction(log, Store.PRIORITY_USER, major, CompactionLifeCycleTracker.DUMMY); 1673 } 1674 return CompactRegionResponse.newBuilder().build(); 1675 } catch (IOException ie) { 1676 throw new ServiceException(ie); 1677 } 1678 } 1679 1680 @Override 1681 public CompactionSwitchResponse compactionSwitch(RpcController controller, 1682 CompactionSwitchRequest request) throws ServiceException { 1683 try { 1684 checkOpen(); 1685 requestCount.increment(); 1686 boolean prevState = regionServer.compactSplitThread.isCompactionsEnabled(); 1687 CompactionSwitchResponse response = 1688 CompactionSwitchResponse.newBuilder().setPrevState(prevState).build(); 1689 if (prevState == request.getEnabled()) { 1690 // passed in requested state is same as current state. No action required 1691 return response; 1692 } 1693 regionServer.compactSplitThread.switchCompaction(request.getEnabled()); 1694 return response; 1695 } catch (IOException ie) { 1696 throw new ServiceException(ie); 1697 } 1698 } 1699 1700 /** 1701 * Flush a region on the region server. 1702 * 1703 * @param controller the RPC controller 1704 * @param request the request 1705 * @throws ServiceException 1706 */ 1707 @Override 1708 @QosPriority(priority=HConstants.ADMIN_QOS) 1709 public FlushRegionResponse flushRegion(final RpcController controller, 1710 final FlushRegionRequest request) throws ServiceException { 1711 try { 1712 checkOpen(); 1713 requestCount.increment(); 1714 HRegion region = getRegion(request.getRegion()); 1715 LOG.info("Flushing " + region.getRegionInfo().getRegionNameAsString()); 1716 boolean shouldFlush = true; 1717 if (request.hasIfOlderThanTs()) { 1718 shouldFlush = region.getEarliestFlushTimeForAllStores() < request.getIfOlderThanTs(); 1719 } 1720 FlushRegionResponse.Builder builder = FlushRegionResponse.newBuilder(); 1721 if (shouldFlush) { 1722 boolean writeFlushWalMarker = request.hasWriteFlushWalMarker() ? 1723 request.getWriteFlushWalMarker() : false; 1724 // Go behind the curtain so we can manage writing of the flush WAL marker 1725 HRegion.FlushResultImpl flushResult = null; 1726 if (request.hasFamily()) { 1727 List families = new ArrayList(); 1728 families.add(request.getFamily().toByteArray()); 1729 flushResult = 1730 region.flushcache(families, writeFlushWalMarker, FlushLifeCycleTracker.DUMMY); 1731 } else { 1732 flushResult = region.flushcache(true, writeFlushWalMarker, FlushLifeCycleTracker.DUMMY); 1733 } 1734 boolean compactionNeeded = flushResult.isCompactionNeeded(); 1735 if (compactionNeeded) { 1736 regionServer.compactSplitThread.requestSystemCompaction(region, 1737 "Compaction through user triggered flush"); 1738 } 1739 builder.setFlushed(flushResult.isFlushSucceeded()); 1740 builder.setWroteFlushWalMarker(flushResult.wroteFlushWalMarker); 1741 } 1742 builder.setLastFlushTime(region.getEarliestFlushTimeForAllStores()); 1743 return builder.build(); 1744 } catch (DroppedSnapshotException ex) { 1745 // Cache flush can fail in a few places. If it fails in a critical 1746 // section, we get a DroppedSnapshotException and a replay of wal 1747 // is required. Currently the only way to do this is a restart of 1748 // the server. 1749 regionServer.abort("Replay of WAL required. Forcing server shutdown", ex); 1750 throw new ServiceException(ex); 1751 } catch (IOException ie) { 1752 throw new ServiceException(ie); 1753 } 1754 } 1755 1756 @Override 1757 @QosPriority(priority=HConstants.ADMIN_QOS) 1758 public GetOnlineRegionResponse getOnlineRegion(final RpcController controller, 1759 final GetOnlineRegionRequest request) throws ServiceException { 1760 try { 1761 checkOpen(); 1762 requestCount.increment(); 1763 Map<String, HRegion> onlineRegions = regionServer.getOnlineRegions(); 1764 List<RegionInfo> list = new ArrayList<>(onlineRegions.size()); 1765 for (HRegion region: onlineRegions.values()) { 1766 list.add(region.getRegionInfo()); 1767 } 1768 list.sort(RegionInfo.COMPARATOR); 1769 return ResponseConverter.buildGetOnlineRegionResponse(list); 1770 } catch (IOException ie) { 1771 throw new ServiceException(ie); 1772 } 1773 } 1774 1775 // Master implementation of this Admin Service differs given it is not 1776 // able to supply detail only known to RegionServer. See note on 1777 // MasterRpcServers#getRegionInfo. 1778 @Override 1779 @QosPriority(priority=HConstants.ADMIN_QOS) 1780 public GetRegionInfoResponse getRegionInfo(final RpcController controller, 1781 final GetRegionInfoRequest request) throws ServiceException { 1782 try { 1783 checkOpen(); 1784 requestCount.increment(); 1785 HRegion region = getRegion(request.getRegion()); 1786 RegionInfo info = region.getRegionInfo(); 1787 byte[] bestSplitRow; 1788 if (request.hasBestSplitRow() && request.getBestSplitRow()) { 1789 bestSplitRow = region.checkSplit(true).orElse(null); 1790 // when all table data are in memstore, bestSplitRow = null 1791 // try to flush region first 1792 if (bestSplitRow == null) { 1793 region.flush(true); 1794 bestSplitRow = region.checkSplit(true).orElse(null); 1795 } 1796 } else { 1797 bestSplitRow = null; 1798 } 1799 GetRegionInfoResponse.Builder builder = GetRegionInfoResponse.newBuilder(); 1800 builder.setRegionInfo(ProtobufUtil.toRegionInfo(info)); 1801 if (request.hasCompactionState() && request.getCompactionState()) { 1802 builder.setCompactionState(ProtobufUtil.createCompactionState(region.getCompactionState())); 1803 } 1804 builder.setSplittable(region.isSplittable()); 1805 builder.setMergeable(region.isMergeable()); 1806 if (request.hasBestSplitRow() && request.getBestSplitRow() && bestSplitRow != null) { 1807 builder.setBestSplitRow(UnsafeByteOperations.unsafeWrap(bestSplitRow)); 1808 } 1809 return builder.build(); 1810 } catch (IOException ie) { 1811 throw new ServiceException(ie); 1812 } 1813 } 1814 1815 @Override 1816 @QosPriority(priority=HConstants.ADMIN_QOS) 1817 public GetRegionLoadResponse getRegionLoad(RpcController controller, 1818 GetRegionLoadRequest request) throws ServiceException { 1819 1820 List<HRegion> regions; 1821 if (request.hasTableName()) { 1822 TableName tableName = ProtobufUtil.toTableName(request.getTableName()); 1823 regions = regionServer.getRegions(tableName); 1824 } else { 1825 regions = regionServer.getRegions(); 1826 } 1827 List<RegionLoad> rLoads = new ArrayList<>(regions.size()); 1828 RegionLoad.Builder regionLoadBuilder = ClusterStatusProtos.RegionLoad.newBuilder(); 1829 RegionSpecifier.Builder regionSpecifier = RegionSpecifier.newBuilder(); 1830 1831 try { 1832 for (HRegion region : regions) { 1833 rLoads.add(regionServer.createRegionLoad(region, regionLoadBuilder, regionSpecifier)); 1834 } 1835 } catch (IOException e) { 1836 throw new ServiceException(e); 1837 } 1838 GetRegionLoadResponse.Builder builder = GetRegionLoadResponse.newBuilder(); 1839 builder.addAllRegionLoads(rLoads); 1840 return builder.build(); 1841 } 1842 1843 @Override 1844 @QosPriority(priority=HConstants.ADMIN_QOS) 1845 public ClearCompactionQueuesResponse clearCompactionQueues(RpcController controller, 1846 ClearCompactionQueuesRequest request) throws ServiceException { 1847 LOG.debug("Client=" + RpcServer.getRequestUserName().orElse(null) + "/" 1848 + RpcServer.getRemoteAddress().orElse(null) + " clear compactions queue"); 1849 ClearCompactionQueuesResponse.Builder respBuilder = ClearCompactionQueuesResponse.newBuilder(); 1850 requestCount.increment(); 1851 if (clearCompactionQueues.compareAndSet(false,true)) { 1852 try { 1853 checkOpen(); 1854 regionServer.getRegionServerCoprocessorHost().preClearCompactionQueues(); 1855 for (String queueName : request.getQueueNameList()) { 1856 LOG.debug("clear " + queueName + " compaction queue"); 1857 switch (queueName) { 1858 case "long": 1859 regionServer.compactSplitThread.clearLongCompactionsQueue(); 1860 break; 1861 case "short": 1862 regionServer.compactSplitThread.clearShortCompactionsQueue(); 1863 break; 1864 default: 1865 LOG.warn("Unknown queue name " + queueName); 1866 throw new IOException("Unknown queue name " + queueName); 1867 } 1868 } 1869 regionServer.getRegionServerCoprocessorHost().postClearCompactionQueues(); 1870 } catch (IOException ie) { 1871 throw new ServiceException(ie); 1872 } finally { 1873 clearCompactionQueues.set(false); 1874 } 1875 } else { 1876 LOG.warn("Clear compactions queue is executing by other admin."); 1877 } 1878 return respBuilder.build(); 1879 } 1880 1881 /** 1882 * Get some information of the region server. 1883 * 1884 * @param controller the RPC controller 1885 * @param request the request 1886 * @throws ServiceException 1887 */ 1888 @Override 1889 @QosPriority(priority=HConstants.ADMIN_QOS) 1890 public GetServerInfoResponse getServerInfo(final RpcController controller, 1891 final GetServerInfoRequest request) throws ServiceException { 1892 try { 1893 checkOpen(); 1894 } catch (IOException ie) { 1895 throw new ServiceException(ie); 1896 } 1897 requestCount.increment(); 1898 int infoPort = regionServer.infoServer != null ? regionServer.infoServer.getPort() : -1; 1899 return ResponseConverter.buildGetServerInfoResponse(regionServer.serverName, infoPort); 1900 } 1901 1902 @Override 1903 @QosPriority(priority=HConstants.ADMIN_QOS) 1904 public GetStoreFileResponse getStoreFile(final RpcController controller, 1905 final GetStoreFileRequest request) throws ServiceException { 1906 try { 1907 checkOpen(); 1908 HRegion region = getRegion(request.getRegion()); 1909 requestCount.increment(); 1910 Set<byte[]> columnFamilies; 1911 if (request.getFamilyCount() == 0) { 1912 columnFamilies = region.getTableDescriptor().getColumnFamilyNames(); 1913 } else { 1914 columnFamilies = new TreeSet<>(Bytes.BYTES_RAWCOMPARATOR); 1915 for (ByteString cf: request.getFamilyList()) { 1916 columnFamilies.add(cf.toByteArray()); 1917 } 1918 } 1919 int nCF = columnFamilies.size(); 1920 List<String> fileList = region.getStoreFileList( 1921 columnFamilies.toArray(new byte[nCF][])); 1922 GetStoreFileResponse.Builder builder = GetStoreFileResponse.newBuilder(); 1923 builder.addAllStoreFile(fileList); 1924 return builder.build(); 1925 } catch (IOException ie) { 1926 throw new ServiceException(ie); 1927 } 1928 } 1929 1930 private void throwOnWrongStartCode(OpenRegionRequest request) throws ServiceException { 1931 if (!request.hasServerStartCode()) { 1932 LOG.warn("OpenRegionRequest for {} does not have a start code", request.getOpenInfoList()); 1933 return; 1934 } 1935 throwOnWrongStartCode(request.getServerStartCode()); 1936 } 1937 1938 private void throwOnWrongStartCode(CloseRegionRequest request) throws ServiceException { 1939 if (!request.hasServerStartCode()) { 1940 LOG.warn("CloseRegionRequest for {} does not have a start code", request.getRegion()); 1941 return; 1942 } 1943 throwOnWrongStartCode(request.getServerStartCode()); 1944 } 1945 1946 private void throwOnWrongStartCode(long serverStartCode) throws ServiceException { 1947 // check that we are the same server that this RPC is intended for. 1948 if (regionServer.serverName.getStartcode() != serverStartCode) { 1949 throw new ServiceException(new DoNotRetryIOException( 1950 "This RPC was intended for a " + "different server with startCode: " + serverStartCode + 1951 ", this server is: " + regionServer.serverName)); 1952 } 1953 } 1954 1955 private void throwOnWrongStartCode(ExecuteProceduresRequest req) throws ServiceException { 1956 if (req.getOpenRegionCount() > 0) { 1957 for (OpenRegionRequest openReq : req.getOpenRegionList()) { 1958 throwOnWrongStartCode(openReq); 1959 } 1960 } 1961 if (req.getCloseRegionCount() > 0) { 1962 for (CloseRegionRequest closeReq : req.getCloseRegionList()) { 1963 throwOnWrongStartCode(closeReq); 1964 } 1965 } 1966 } 1967 1968 /** 1969 * Open asynchronously a region or a set of regions on the region server. 1970 * 1971 * The opening is coordinated by ZooKeeper, and this method requires the znode to be created 1972 * before being called. As a consequence, this method should be called only from the master. 1973 * <p> 1974 * Different manages states for the region are: 1975 * </p><ul> 1976 * <li>region not opened: the region opening will start asynchronously.</li> 1977 * <li>a close is already in progress: this is considered as an error.</li> 1978 * <li>an open is already in progress: this new open request will be ignored. This is important 1979 * because the Master can do multiple requests if it crashes.</li> 1980 * <li>the region is already opened: this new open request will be ignored.</li> 1981 * </ul> 1982 * <p> 1983 * Bulk assign: If there are more than 1 region to open, it will be considered as a bulk assign. 1984 * For a single region opening, errors are sent through a ServiceException. For bulk assign, 1985 * errors are put in the response as FAILED_OPENING. 1986 * </p> 1987 * @param controller the RPC controller 1988 * @param request the request 1989 * @throws ServiceException 1990 */ 1991 @Override 1992 @QosPriority(priority=HConstants.ADMIN_QOS) 1993 public OpenRegionResponse openRegion(final RpcController controller, 1994 final OpenRegionRequest request) throws ServiceException { 1995 requestCount.increment(); 1996 throwOnWrongStartCode(request); 1997 1998 OpenRegionResponse.Builder builder = OpenRegionResponse.newBuilder(); 1999 final int regionCount = request.getOpenInfoCount(); 2000 final Map<TableName, TableDescriptor> htds = new HashMap<>(regionCount); 2001 final boolean isBulkAssign = regionCount > 1; 2002 try { 2003 checkOpen(); 2004 } catch (IOException ie) { 2005 TableName tableName = null; 2006 if (regionCount == 1) { 2007 org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionInfo ri = request.getOpenInfo(0).getRegion(); 2008 if (ri != null) { 2009 tableName = ProtobufUtil.toTableName(ri.getTableName()); 2010 } 2011 } 2012 if (!TableName.META_TABLE_NAME.equals(tableName)) { 2013 throw new ServiceException(ie); 2014 } 2015 // We are assigning meta, wait a little for regionserver to finish initialization. 2016 int timeout = regionServer.getConfiguration().getInt(HConstants.HBASE_RPC_TIMEOUT_KEY, 2017 HConstants.DEFAULT_HBASE_RPC_TIMEOUT) >> 2; // Quarter of RPC timeout 2018 long endTime = System.currentTimeMillis() + timeout; 2019 synchronized (regionServer.online) { 2020 try { 2021 while (System.currentTimeMillis() <= endTime 2022 && !regionServer.isStopped() && !regionServer.isOnline()) { 2023 regionServer.online.wait(regionServer.msgInterval); 2024 } 2025 checkOpen(); 2026 } catch (InterruptedException t) { 2027 Thread.currentThread().interrupt(); 2028 throw new ServiceException(t); 2029 } catch (IOException e) { 2030 throw new ServiceException(e); 2031 } 2032 } 2033 } 2034 2035 long masterSystemTime = request.hasMasterSystemTime() ? request.getMasterSystemTime() : -1; 2036 2037 for (RegionOpenInfo regionOpenInfo : request.getOpenInfoList()) { 2038 final RegionInfo region = ProtobufUtil.toRegionInfo(regionOpenInfo.getRegion()); 2039 TableDescriptor htd; 2040 try { 2041 String encodedName = region.getEncodedName(); 2042 byte[] encodedNameBytes = region.getEncodedNameAsBytes(); 2043 final HRegion onlineRegion = regionServer.getRegion(encodedName); 2044 if (onlineRegion != null) { 2045 // The region is already online. This should not happen any more. 2046 String error = "Received OPEN for the region:" 2047 + region.getRegionNameAsString() + ", which is already online"; 2048 LOG.warn(error); 2049 //regionServer.abort(error); 2050 //throw new IOException(error); 2051 builder.addOpeningState(RegionOpeningState.OPENED); 2052 continue; 2053 } 2054 LOG.info("Open " + region.getRegionNameAsString()); 2055 2056 final Boolean previous = regionServer.getRegionsInTransitionInRS().putIfAbsent( 2057 encodedNameBytes, Boolean.TRUE); 2058 2059 if (Boolean.FALSE.equals(previous)) { 2060 if (regionServer.getRegion(encodedName) != null) { 2061 // There is a close in progress. This should not happen any more. 2062 String error = "Received OPEN for the region:" 2063 + region.getRegionNameAsString() + ", which we are already trying to CLOSE"; 2064 regionServer.abort(error); 2065 throw new IOException(error); 2066 } 2067 regionServer.getRegionsInTransitionInRS().put(encodedNameBytes, Boolean.TRUE); 2068 } 2069 2070 if (Boolean.TRUE.equals(previous)) { 2071 // An open is in progress. This is supported, but let's log this. 2072 LOG.info("Receiving OPEN for the region:" + 2073 region.getRegionNameAsString() + ", which we are already trying to OPEN" 2074 + " - ignoring this new request for this region."); 2075 } 2076 2077 // We are opening this region. If it moves back and forth for whatever reason, we don't 2078 // want to keep returning the stale moved record while we are opening/if we close again. 2079 regionServer.removeFromMovedRegions(region.getEncodedName()); 2080 2081 if (previous == null || !previous.booleanValue()) { 2082 htd = htds.get(region.getTable()); 2083 if (htd == null) { 2084 htd = regionServer.tableDescriptors.get(region.getTable()); 2085 htds.put(region.getTable(), htd); 2086 } 2087 if (htd == null) { 2088 throw new IOException("Missing table descriptor for " + region.getEncodedName()); 2089 } 2090 // If there is no action in progress, we can submit a specific handler. 2091 // Need to pass the expected version in the constructor. 2092 if (regionServer.executorService == null) { 2093 LOG.info("No executor executorService; skipping open request"); 2094 } else { 2095 if (region.isMetaRegion()) { 2096 regionServer.executorService.submit(new OpenMetaHandler( 2097 regionServer, regionServer, region, htd, masterSystemTime)); 2098 } else { 2099 if (regionOpenInfo.getFavoredNodesCount() > 0) { 2100 regionServer.updateRegionFavoredNodesMapping(region.getEncodedName(), 2101 regionOpenInfo.getFavoredNodesList()); 2102 } 2103 if (htd.getPriority() >= HConstants.ADMIN_QOS || region.getTable().isSystemTable()) { 2104 regionServer.executorService.submit(new OpenPriorityRegionHandler( 2105 regionServer, regionServer, region, htd, masterSystemTime)); 2106 } else { 2107 regionServer.executorService.submit(new OpenRegionHandler( 2108 regionServer, regionServer, region, htd, masterSystemTime)); 2109 } 2110 } 2111 } 2112 } 2113 2114 builder.addOpeningState(RegionOpeningState.OPENED); 2115 } catch (IOException ie) { 2116 LOG.warn("Failed opening region " + region.getRegionNameAsString(), ie); 2117 if (isBulkAssign) { 2118 builder.addOpeningState(RegionOpeningState.FAILED_OPENING); 2119 } else { 2120 throw new ServiceException(ie); 2121 } 2122 } 2123 } 2124 return builder.build(); 2125 } 2126 2127 /** 2128 * Warmup a region on this server. 2129 * This method should only be called by Master. It synchronously opens the region and 2130 * closes the region bringing the most important pages in cache. 2131 */ 2132 @Override 2133 public WarmupRegionResponse warmupRegion(final RpcController controller, 2134 final WarmupRegionRequest request) throws ServiceException { 2135 final RegionInfo region = ProtobufUtil.toRegionInfo(request.getRegionInfo()); 2136 WarmupRegionResponse response = WarmupRegionResponse.getDefaultInstance(); 2137 try { 2138 checkOpen(); 2139 String encodedName = region.getEncodedName(); 2140 byte[] encodedNameBytes = region.getEncodedNameAsBytes(); 2141 final HRegion onlineRegion = regionServer.getRegion(encodedName); 2142 if (onlineRegion != null) { 2143 LOG.info("{} is online; skipping warmup", region); 2144 return response; 2145 } 2146 TableDescriptor htd = regionServer.tableDescriptors.get(region.getTable()); 2147 if (regionServer.getRegionsInTransitionInRS().containsKey(encodedNameBytes)) { 2148 LOG.info("{} is in transition; skipping warmup", region); 2149 return response; 2150 } 2151 LOG.info("Warmup {}", region.getRegionNameAsString()); 2152 HRegion.warmupHRegion(region, htd, regionServer.getWAL(region), 2153 regionServer.getConfiguration(), regionServer, null); 2154 } catch (IOException ie) { 2155 LOG.error("Failed warmup of {}", region.getRegionNameAsString(), ie); 2156 throw new ServiceException(ie); 2157 } 2158 2159 return response; 2160 } 2161 2162 /** 2163 * Replay the given changes when distributedLogReplay WAL edits from a failed RS. The guarantee is 2164 * that the given mutations will be durable on the receiving RS if this method returns without any 2165 * exception. 2166 * @param controller the RPC controller 2167 * @param request the request 2168 * @throws ServiceException 2169 */ 2170 @Override 2171 @QosPriority(priority = HConstants.REPLAY_QOS) 2172 public ReplicateWALEntryResponse replay(final RpcController controller, 2173 final ReplicateWALEntryRequest request) throws ServiceException { 2174 long before = EnvironmentEdgeManager.currentTime(); 2175 CellScanner cells = ((HBaseRpcController) controller).cellScanner(); 2176 try { 2177 checkOpen(); 2178 List<WALEntry> entries = request.getEntryList(); 2179 if (entries == null || entries.isEmpty()) { 2180 // empty input 2181 return ReplicateWALEntryResponse.newBuilder().build(); 2182 } 2183 ByteString regionName = entries.get(0).getKey().getEncodedRegionName(); 2184 HRegion region = regionServer.getRegionByEncodedName(regionName.toStringUtf8()); 2185 RegionCoprocessorHost coprocessorHost = 2186 ServerRegionReplicaUtil.isDefaultReplica(region.getRegionInfo()) 2187 ? region.getCoprocessorHost() 2188 : null; // do not invoke coprocessors if this is a secondary region replica 2189 List<Pair<WALKey, WALEdit>> walEntries = new ArrayList<>(); 2190 2191 // Skip adding the edits to WAL if this is a secondary region replica 2192 boolean isPrimary = RegionReplicaUtil.isDefaultReplica(region.getRegionInfo()); 2193 Durability durability = isPrimary ? Durability.USE_DEFAULT : Durability.SKIP_WAL; 2194 2195 for (WALEntry entry : entries) { 2196 if (!regionName.equals(entry.getKey().getEncodedRegionName())) { 2197 throw new NotServingRegionException("Replay request contains entries from multiple " + 2198 "regions. First region:" + regionName.toStringUtf8() + " , other region:" 2199 + entry.getKey().getEncodedRegionName()); 2200 } 2201 if (regionServer.nonceManager != null && isPrimary) { 2202 long nonceGroup = entry.getKey().hasNonceGroup() 2203 ? entry.getKey().getNonceGroup() : HConstants.NO_NONCE; 2204 long nonce = entry.getKey().hasNonce() ? entry.getKey().getNonce() : HConstants.NO_NONCE; 2205 regionServer.nonceManager.reportOperationFromWal( 2206 nonceGroup, 2207 nonce, 2208 entry.getKey().getWriteTime()); 2209 } 2210 Pair<WALKey, WALEdit> walEntry = (coprocessorHost == null) ? null : new Pair<>(); 2211 List<MutationReplay> edits = WALSplitUtil.getMutationsFromWALEntry(entry, 2212 cells, walEntry, durability); 2213 if (coprocessorHost != null) { 2214 // Start coprocessor replay here. The coprocessor is for each WALEdit instead of a 2215 // KeyValue. 2216 if (coprocessorHost.preWALRestore(region.getRegionInfo(), walEntry.getFirst(), 2217 walEntry.getSecond())) { 2218 // if bypass this log entry, ignore it ... 2219 continue; 2220 } 2221 walEntries.add(walEntry); 2222 } 2223 if(edits!=null && !edits.isEmpty()) { 2224 // HBASE-17924 2225 // sort to improve lock efficiency 2226 Collections.sort(edits, (v1, v2) -> Row.COMPARATOR.compare(v1.mutation, v2.mutation)); 2227 long replaySeqId = (entry.getKey().hasOrigSequenceNumber()) ? 2228 entry.getKey().getOrigSequenceNumber() : entry.getKey().getLogSequenceNumber(); 2229 OperationStatus[] result = doReplayBatchOp(region, edits, replaySeqId); 2230 // check if it's a partial success 2231 for (int i = 0; result != null && i < result.length; i++) { 2232 if (result[i] != OperationStatus.SUCCESS) { 2233 throw new IOException(result[i].getExceptionMsg()); 2234 } 2235 } 2236 } 2237 } 2238 2239 //sync wal at the end because ASYNC_WAL is used above 2240 WAL wal = region.getWAL(); 2241 if (wal != null) { 2242 wal.sync(); 2243 } 2244 2245 if (coprocessorHost != null) { 2246 for (Pair<WALKey, WALEdit> entry : walEntries) { 2247 coprocessorHost.postWALRestore(region.getRegionInfo(), entry.getFirst(), 2248 entry.getSecond()); 2249 } 2250 } 2251 return ReplicateWALEntryResponse.newBuilder().build(); 2252 } catch (IOException ie) { 2253 throw new ServiceException(ie); 2254 } finally { 2255 final MetricsRegionServer metricsRegionServer = regionServer.getMetrics(); 2256 if (metricsRegionServer != null) { 2257 metricsRegionServer.updateReplay(EnvironmentEdgeManager.currentTime() - before); 2258 } 2259 } 2260 } 2261 2262 /** 2263 * Replicate WAL entries on the region server. 2264 * 2265 * @param controller the RPC controller 2266 * @param request the request 2267 */ 2268 @Override 2269 @QosPriority(priority=HConstants.REPLICATION_QOS) 2270 public ReplicateWALEntryResponse replicateWALEntry(final RpcController controller, 2271 final ReplicateWALEntryRequest request) throws ServiceException { 2272 try { 2273 checkOpen(); 2274 if (regionServer.getReplicationSinkService() != null) { 2275 requestCount.increment(); 2276 List<WALEntry> entries = request.getEntryList(); 2277 CellScanner cellScanner = ((HBaseRpcController)controller).cellScanner(); 2278 regionServer.getRegionServerCoprocessorHost().preReplicateLogEntries(); 2279 regionServer.getReplicationSinkService().replicateLogEntries(entries, cellScanner, 2280 request.getReplicationClusterId(), request.getSourceBaseNamespaceDirPath(), 2281 request.getSourceHFileArchiveDirPath()); 2282 regionServer.getRegionServerCoprocessorHost().postReplicateLogEntries(); 2283 return ReplicateWALEntryResponse.newBuilder().build(); 2284 } else { 2285 throw new ServiceException("Replication services are not initialized yet"); 2286 } 2287 } catch (IOException ie) { 2288 throw new ServiceException(ie); 2289 } 2290 } 2291 2292 /** 2293 * Roll the WAL writer of the region server. 2294 * @param controller the RPC controller 2295 * @param request the request 2296 * @throws ServiceException 2297 */ 2298 @Override 2299 public RollWALWriterResponse rollWALWriter(final RpcController controller, 2300 final RollWALWriterRequest request) throws ServiceException { 2301 try { 2302 checkOpen(); 2303 requestCount.increment(); 2304 regionServer.getRegionServerCoprocessorHost().preRollWALWriterRequest(); 2305 regionServer.getWalRoller().requestRollAll(); 2306 regionServer.getRegionServerCoprocessorHost().postRollWALWriterRequest(); 2307 RollWALWriterResponse.Builder builder = RollWALWriterResponse.newBuilder(); 2308 return builder.build(); 2309 } catch (IOException ie) { 2310 throw new ServiceException(ie); 2311 } 2312 } 2313 2314 2315 /** 2316 * Stop the region server. 2317 * 2318 * @param controller the RPC controller 2319 * @param request the request 2320 * @throws ServiceException 2321 */ 2322 @Override 2323 @QosPriority(priority=HConstants.ADMIN_QOS) 2324 public StopServerResponse stopServer(final RpcController controller, 2325 final StopServerRequest request) throws ServiceException { 2326 requestCount.increment(); 2327 String reason = request.getReason(); 2328 regionServer.stop(reason); 2329 return StopServerResponse.newBuilder().build(); 2330 } 2331 2332 @Override 2333 public UpdateFavoredNodesResponse updateFavoredNodes(RpcController controller, 2334 UpdateFavoredNodesRequest request) throws ServiceException { 2335 List<UpdateFavoredNodesRequest.RegionUpdateInfo> openInfoList = request.getUpdateInfoList(); 2336 UpdateFavoredNodesResponse.Builder respBuilder = UpdateFavoredNodesResponse.newBuilder(); 2337 for (UpdateFavoredNodesRequest.RegionUpdateInfo regionUpdateInfo : openInfoList) { 2338 RegionInfo hri = ProtobufUtil.toRegionInfo(regionUpdateInfo.getRegion()); 2339 if (regionUpdateInfo.getFavoredNodesCount() > 0) { 2340 regionServer.updateRegionFavoredNodesMapping(hri.getEncodedName(), 2341 regionUpdateInfo.getFavoredNodesList()); 2342 } 2343 } 2344 respBuilder.setResponse(openInfoList.size()); 2345 return respBuilder.build(); 2346 } 2347 2348 /** 2349 * Atomically bulk load several HFiles into an open region 2350 * @return true if successful, false is failed but recoverably (no action) 2351 * @throws ServiceException if failed unrecoverably 2352 */ 2353 @Override 2354 public BulkLoadHFileResponse bulkLoadHFile(final RpcController controller, 2355 final BulkLoadHFileRequest request) throws ServiceException { 2356 long start = EnvironmentEdgeManager.currentTime(); 2357 List<String> clusterIds = new ArrayList<>(request.getClusterIdsList()); 2358 if(clusterIds.contains(this.regionServer.clusterId)){ 2359 return BulkLoadHFileResponse.newBuilder().setLoaded(true).build(); 2360 } else { 2361 clusterIds.add(this.regionServer.clusterId); 2362 } 2363 try { 2364 checkOpen(); 2365 requestCount.increment(); 2366 HRegion region = getRegion(request.getRegion()); 2367 Map<byte[], List<Path>> map = null; 2368 final boolean spaceQuotaEnabled = QuotaUtil.isQuotaEnabled(getConfiguration()); 2369 long sizeToBeLoaded = -1; 2370 2371 // Check to see if this bulk load would exceed the space quota for this table 2372 if (spaceQuotaEnabled) { 2373 ActivePolicyEnforcement activeSpaceQuotas = getSpaceQuotaManager().getActiveEnforcements(); 2374 SpaceViolationPolicyEnforcement enforcement = activeSpaceQuotas.getPolicyEnforcement( 2375 region); 2376 if (enforcement != null) { 2377 // Bulk loads must still be atomic. We must enact all or none. 2378 List<String> filePaths = new ArrayList<>(request.getFamilyPathCount()); 2379 for (FamilyPath familyPath : request.getFamilyPathList()) { 2380 filePaths.add(familyPath.getPath()); 2381 } 2382 // Check if the batch of files exceeds the current quota 2383 sizeToBeLoaded = enforcement.computeBulkLoadSize(regionServer.getFileSystem(), filePaths); 2384 } 2385 } 2386 2387 List<Pair<byte[], String>> familyPaths = new ArrayList<>(request.getFamilyPathCount()); 2388 for (FamilyPath familyPath : request.getFamilyPathList()) { 2389 familyPaths.add(new Pair<>(familyPath.getFamily().toByteArray(), familyPath.getPath())); 2390 } 2391 if (!request.hasBulkToken()) { 2392 if (region.getCoprocessorHost() != null) { 2393 region.getCoprocessorHost().preBulkLoadHFile(familyPaths); 2394 } 2395 try { 2396 map = region.bulkLoadHFiles(familyPaths, request.getAssignSeqNum(), null, 2397 request.getCopyFile(), clusterIds, request.getReplicate()); 2398 } finally { 2399 if (region.getCoprocessorHost() != null) { 2400 region.getCoprocessorHost().postBulkLoadHFile(familyPaths, map); 2401 } 2402 } 2403 } else { 2404 // secure bulk load 2405 map = regionServer.getSecureBulkLoadManager().secureBulkLoadHFiles(region, request, clusterIds); 2406 } 2407 BulkLoadHFileResponse.Builder builder = BulkLoadHFileResponse.newBuilder(); 2408 builder.setLoaded(map != null); 2409 if (map != null) { 2410 // Treat any negative size as a flag to "ignore" updating the region size as that is 2411 // not possible to occur in real life (cannot bulk load a file with negative size) 2412 if (spaceQuotaEnabled && sizeToBeLoaded > 0) { 2413 if (LOG.isTraceEnabled()) { 2414 LOG.trace("Incrementing space use of " + region.getRegionInfo() + " by " 2415 + sizeToBeLoaded + " bytes"); 2416 } 2417 // Inform space quotas of the new files for this region 2418 getSpaceQuotaManager().getRegionSizeStore().incrementRegionSize( 2419 region.getRegionInfo(), sizeToBeLoaded); 2420 } 2421 } 2422 return builder.build(); 2423 } catch (IOException ie) { 2424 throw new ServiceException(ie); 2425 } finally { 2426 final MetricsRegionServer metricsRegionServer = regionServer.getMetrics(); 2427 if (metricsRegionServer != null) { 2428 metricsRegionServer.updateBulkLoad(EnvironmentEdgeManager.currentTime() - start); 2429 } 2430 } 2431 } 2432 2433 @Override 2434 public PrepareBulkLoadResponse prepareBulkLoad(RpcController controller, 2435 PrepareBulkLoadRequest request) throws ServiceException { 2436 try { 2437 checkOpen(); 2438 requestCount.increment(); 2439 2440 HRegion region = getRegion(request.getRegion()); 2441 2442 String bulkToken = regionServer.getSecureBulkLoadManager().prepareBulkLoad(region, request); 2443 PrepareBulkLoadResponse.Builder builder = PrepareBulkLoadResponse.newBuilder(); 2444 builder.setBulkToken(bulkToken); 2445 return builder.build(); 2446 } catch (IOException ie) { 2447 throw new ServiceException(ie); 2448 } 2449 } 2450 2451 @Override 2452 public CleanupBulkLoadResponse cleanupBulkLoad(RpcController controller, 2453 CleanupBulkLoadRequest request) throws ServiceException { 2454 try { 2455 checkOpen(); 2456 requestCount.increment(); 2457 2458 HRegion region = getRegion(request.getRegion()); 2459 2460 regionServer.getSecureBulkLoadManager().cleanupBulkLoad(region, request); 2461 return CleanupBulkLoadResponse.newBuilder().build(); 2462 } catch (IOException ie) { 2463 throw new ServiceException(ie); 2464 } 2465 } 2466 2467 @Override 2468 public CoprocessorServiceResponse execService(final RpcController controller, 2469 final CoprocessorServiceRequest request) throws ServiceException { 2470 try { 2471 checkOpen(); 2472 requestCount.increment(); 2473 HRegion region = getRegion(request.getRegion()); 2474 com.google.protobuf.Message result = execServiceOnRegion(region, request.getCall()); 2475 CoprocessorServiceResponse.Builder builder = CoprocessorServiceResponse.newBuilder(); 2476 builder.setRegion(RequestConverter.buildRegionSpecifier( 2477 RegionSpecifierType.REGION_NAME, region.getRegionInfo().getRegionName())); 2478 // TODO: COPIES!!!!!! 2479 builder.setValue(builder.getValueBuilder().setName(result.getClass().getName()). 2480 setValue(org.apache.hbase.thirdparty.com.google.protobuf.ByteString. 2481 copyFrom(result.toByteArray()))); 2482 return builder.build(); 2483 } catch (IOException ie) { 2484 throw new ServiceException(ie); 2485 } 2486 } 2487 2488 private com.google.protobuf.Message execServiceOnRegion(HRegion region, 2489 final ClientProtos.CoprocessorServiceCall serviceCall) throws IOException { 2490 // ignore the passed in controller (from the serialized call) 2491 ServerRpcController execController = new ServerRpcController(); 2492 return region.execService(execController, serviceCall); 2493 } 2494 2495 /** 2496 * Get data from a table. 2497 * 2498 * @param controller the RPC controller 2499 * @param request the get request 2500 * @throws ServiceException 2501 */ 2502 @Override 2503 public GetResponse get(final RpcController controller, 2504 final GetRequest request) throws ServiceException { 2505 long before = EnvironmentEdgeManager.currentTime(); 2506 OperationQuota quota = null; 2507 HRegion region = null; 2508 try { 2509 checkOpen(); 2510 requestCount.increment(); 2511 rpcGetRequestCount.increment(); 2512 region = getRegion(request.getRegion()); 2513 2514 GetResponse.Builder builder = GetResponse.newBuilder(); 2515 ClientProtos.Get get = request.getGet(); 2516 // An asynchbase client, https://github.com/OpenTSDB/asynchbase, starts by trying to do 2517 // a get closest before. Throwing the UnknownProtocolException signals it that it needs 2518 // to switch and do hbase2 protocol (HBase servers do not tell clients what versions 2519 // they are; its a problem for non-native clients like asynchbase. HBASE-20225. 2520 if (get.hasClosestRowBefore() && get.getClosestRowBefore()) { 2521 throw new UnknownProtocolException("Is this a pre-hbase-1.0.0 or asynchbase client? " + 2522 "Client is invoking getClosestRowBefore removed in hbase-2.0.0 replaced by " + 2523 "reverse Scan."); 2524 } 2525 Boolean existence = null; 2526 Result r = null; 2527 RpcCallContext context = RpcServer.getCurrentCall().orElse(null); 2528 quota = getRpcQuotaManager().checkQuota(region, OperationQuota.OperationType.GET); 2529 2530 Get clientGet = ProtobufUtil.toGet(get); 2531 if (get.getExistenceOnly() && region.getCoprocessorHost() != null) { 2532 existence = region.getCoprocessorHost().preExists(clientGet); 2533 } 2534 if (existence == null) { 2535 if (context != null) { 2536 r = get(clientGet, (region), null, context); 2537 } else { 2538 // for test purpose 2539 r = region.get(clientGet); 2540 } 2541 if (get.getExistenceOnly()) { 2542 boolean exists = r.getExists(); 2543 if (region.getCoprocessorHost() != null) { 2544 exists = region.getCoprocessorHost().postExists(clientGet, exists); 2545 } 2546 existence = exists; 2547 } 2548 } 2549 if (existence != null) { 2550 ClientProtos.Result pbr = 2551 ProtobufUtil.toResult(existence, region.getRegionInfo().getReplicaId() != 0); 2552 builder.setResult(pbr); 2553 } else if (r != null) { 2554 ClientProtos.Result pbr; 2555 if (isClientCellBlockSupport(context) && controller instanceof HBaseRpcController 2556 && VersionInfoUtil.hasMinimumVersion(context.getClientVersionInfo(), 1, 3)) { 2557 pbr = ProtobufUtil.toResultNoData(r); 2558 ((HBaseRpcController) controller).setCellScanner(CellUtil.createCellScanner(r 2559 .rawCells())); 2560 addSize(context, r, null); 2561 } else { 2562 pbr = ProtobufUtil.toResult(r); 2563 } 2564 builder.setResult(pbr); 2565 } 2566 //r.cells is null when an table.exists(get) call 2567 if (r != null && r.rawCells() != null) { 2568 quota.addGetResult(r); 2569 } 2570 return builder.build(); 2571 } catch (IOException ie) { 2572 throw new ServiceException(ie); 2573 } finally { 2574 final MetricsRegionServer metricsRegionServer = regionServer.getMetrics(); 2575 if (metricsRegionServer != null) { 2576 TableDescriptor td = region != null? region.getTableDescriptor(): null; 2577 if (td != null) { 2578 metricsRegionServer.updateGet( 2579 td.getTableName(), EnvironmentEdgeManager.currentTime() - before); 2580 } 2581 } 2582 if (quota != null) { 2583 quota.close(); 2584 } 2585 } 2586 } 2587 2588 private Result get(Get get, HRegion region, RegionScannersCloseCallBack closeCallBack, 2589 RpcCallContext context) throws IOException { 2590 region.prepareGet(get); 2591 boolean stale = region.getRegionInfo().getReplicaId() != 0; 2592 2593 // This method is almost the same as HRegion#get. 2594 List<Cell> results = new ArrayList<>(); 2595 long before = EnvironmentEdgeManager.currentTime(); 2596 // pre-get CP hook 2597 if (region.getCoprocessorHost() != null) { 2598 if (region.getCoprocessorHost().preGet(get, results)) { 2599 region.metricsUpdateForGet(results, before); 2600 return Result 2601 .create(results, get.isCheckExistenceOnly() ? !results.isEmpty() : null, stale); 2602 } 2603 } 2604 Scan scan = new Scan(get); 2605 if (scan.getLoadColumnFamiliesOnDemandValue() == null) { 2606 scan.setLoadColumnFamiliesOnDemand(region.isLoadingCfsOnDemandDefault()); 2607 } 2608 RegionScannerImpl scanner = null; 2609 try { 2610 scanner = region.getScanner(scan); 2611 scanner.next(results); 2612 } finally { 2613 if (scanner != null) { 2614 if (closeCallBack == null) { 2615 // If there is a context then the scanner can be added to the current 2616 // RpcCallContext. The rpc callback will take care of closing the 2617 // scanner, for eg in case 2618 // of get() 2619 context.setCallBack(scanner); 2620 } else { 2621 // The call is from multi() where the results from the get() are 2622 // aggregated and then send out to the 2623 // rpc. The rpccall back will close all such scanners created as part 2624 // of multi(). 2625 closeCallBack.addScanner(scanner); 2626 } 2627 } 2628 } 2629 2630 // post-get CP hook 2631 if (region.getCoprocessorHost() != null) { 2632 region.getCoprocessorHost().postGet(get, results); 2633 } 2634 region.metricsUpdateForGet(results, before); 2635 2636 return Result.create(results, get.isCheckExistenceOnly() ? !results.isEmpty() : null, stale); 2637 } 2638 2639 private void checkBatchSizeAndLogLargeSize(MultiRequest request) throws ServiceException { 2640 int sum = 0; 2641 String firstRegionName = null; 2642 for (RegionAction regionAction : request.getRegionActionList()) { 2643 if (sum == 0) { 2644 firstRegionName = Bytes.toStringBinary(regionAction.getRegion().getValue().toByteArray()); 2645 } 2646 sum += regionAction.getActionCount(); 2647 } 2648 if (sum > rowSizeWarnThreshold) { 2649 LOG.warn("Large batch operation detected (greater than " + rowSizeWarnThreshold + 2650 ") (HBASE-18023)." + " Requested Number of Rows: " + sum + " Client: " + 2651 RpcServer.getRequestUserName().orElse(null) + "/" + 2652 RpcServer.getRemoteAddress().orElse(null) + " first region in multi=" + firstRegionName); 2653 if (rejectRowsWithSizeOverThreshold) { 2654 throw new ServiceException( 2655 "Rejecting large batch operation for current batch with firstRegionName: " + 2656 firstRegionName + " , Requested Number of Rows: " + sum + " , Size Threshold: " + 2657 rowSizeWarnThreshold); 2658 } 2659 } 2660 } 2661 2662 private void failRegionAction(MultiResponse.Builder responseBuilder, 2663 RegionActionResult.Builder regionActionResultBuilder, RegionAction regionAction, 2664 CellScanner cellScanner, Throwable error) { 2665 rpcServer.getMetrics().exception(error); 2666 regionActionResultBuilder.setException(ResponseConverter.buildException(error)); 2667 responseBuilder.addRegionActionResult(regionActionResultBuilder.build()); 2668 // All Mutations in this RegionAction not executed as we can not see the Region online here 2669 // in this RS. Will be retried from Client. Skipping all the Cells in CellScanner 2670 // corresponding to these Mutations. 2671 if (cellScanner != null) { 2672 skipCellsForMutations(regionAction.getActionList(), cellScanner); 2673 } 2674 } 2675 2676 /** 2677 * Execute multiple actions on a table: get, mutate, and/or execCoprocessor 2678 * 2679 * @param rpcc the RPC controller 2680 * @param request the multi request 2681 * @throws ServiceException 2682 */ 2683 @Override 2684 public MultiResponse multi(final RpcController rpcc, final MultiRequest request) 2685 throws ServiceException { 2686 try { 2687 checkOpen(); 2688 } catch (IOException ie) { 2689 throw new ServiceException(ie); 2690 } 2691 2692 checkBatchSizeAndLogLargeSize(request); 2693 2694 // rpc controller is how we bring in data via the back door; it is unprotobuf'ed data. 2695 // It is also the conduit via which we pass back data. 2696 HBaseRpcController controller = (HBaseRpcController)rpcc; 2697 CellScanner cellScanner = controller != null ? controller.cellScanner(): null; 2698 if (controller != null) { 2699 controller.setCellScanner(null); 2700 } 2701 2702 long nonceGroup = request.hasNonceGroup() ? request.getNonceGroup() : HConstants.NO_NONCE; 2703 2704 MultiResponse.Builder responseBuilder = MultiResponse.newBuilder(); 2705 RegionActionResult.Builder regionActionResultBuilder = RegionActionResult.newBuilder(); 2706 this.rpcMultiRequestCount.increment(); 2707 this.requestCount.increment(); 2708 ActivePolicyEnforcement spaceQuotaEnforcement = getSpaceQuotaManager().getActiveEnforcements(); 2709 2710 // We no longer use MultiRequest#condition. Instead, we use RegionAction#condition. The 2711 // following logic is for backward compatibility as old clients still use 2712 // MultiRequest#condition in case of checkAndMutate with RowMutations. 2713 if (request.hasCondition()) { 2714 if (request.getRegionActionList().isEmpty()) { 2715 // If the region action list is empty, do nothing. 2716 responseBuilder.setProcessed(true); 2717 return responseBuilder.build(); 2718 } 2719 2720 RegionAction regionAction = request.getRegionAction(0); 2721 2722 // When request.hasCondition() is true, regionAction.getAtomic() should be always true. So 2723 // we can assume regionAction.getAtomic() is true here. 2724 assert regionAction.getAtomic(); 2725 2726 OperationQuota quota; 2727 HRegion region; 2728 RegionSpecifier regionSpecifier = regionAction.getRegion(); 2729 2730 try { 2731 region = getRegion(regionSpecifier); 2732 quota = getRpcQuotaManager().checkQuota(region, regionAction.getActionList()); 2733 } catch (IOException e) { 2734 failRegionAction(responseBuilder, regionActionResultBuilder, regionAction, cellScanner, e); 2735 return responseBuilder.build(); 2736 } 2737 2738 try { 2739 CheckAndMutateResult result = checkAndMutate(region, regionAction.getActionList(), 2740 cellScanner, request.getCondition(), spaceQuotaEnforcement); 2741 responseBuilder.setProcessed(result.isSuccess()); 2742 ClientProtos.ResultOrException.Builder resultOrExceptionOrBuilder = 2743 ClientProtos.ResultOrException.newBuilder(); 2744 for (int i = 0; i < regionAction.getActionCount(); i++) { 2745 // To unify the response format with doNonAtomicRegionMutation and read through 2746 // client's AsyncProcess we have to add an empty result instance per operation 2747 resultOrExceptionOrBuilder.clear(); 2748 resultOrExceptionOrBuilder.setIndex(i); 2749 regionActionResultBuilder.addResultOrException(resultOrExceptionOrBuilder.build()); 2750 } 2751 } catch (IOException e) { 2752 rpcServer.getMetrics().exception(e); 2753 // As it's an atomic operation with a condition, we may expect it's a global failure. 2754 regionActionResultBuilder.setException(ResponseConverter.buildException(e)); 2755 } finally { 2756 quota.close(); 2757 } 2758 2759 responseBuilder.addRegionActionResult(regionActionResultBuilder.build()); 2760 ClientProtos.RegionLoadStats regionLoadStats = region.getLoadStatistics(); 2761 if (regionLoadStats != null) { 2762 responseBuilder.setRegionStatistics(MultiRegionLoadStats.newBuilder() 2763 .addRegion(regionSpecifier).addStat(regionLoadStats).build()); 2764 } 2765 return responseBuilder.build(); 2766 } 2767 2768 // this will contain all the cells that we need to return. It's created later, if needed. 2769 List<CellScannable> cellsToReturn = null; 2770 RegionScannersCloseCallBack closeCallBack = null; 2771 RpcCallContext context = RpcServer.getCurrentCall().orElse(null); 2772 Map<RegionSpecifier, ClientProtos.RegionLoadStats> regionStats = new HashMap<>(request 2773 .getRegionActionCount()); 2774 2775 for (RegionAction regionAction : request.getRegionActionList()) { 2776 OperationQuota quota; 2777 HRegion region; 2778 RegionSpecifier regionSpecifier = regionAction.getRegion(); 2779 regionActionResultBuilder.clear(); 2780 2781 try { 2782 region = getRegion(regionSpecifier); 2783 quota = getRpcQuotaManager().checkQuota(region, regionAction.getActionList()); 2784 } catch (IOException e) { 2785 failRegionAction(responseBuilder, regionActionResultBuilder, regionAction, cellScanner, e); 2786 continue; // For this region it's a failure. 2787 } 2788 2789 try { 2790 if (regionAction.hasCondition()) { 2791 try { 2792 ClientProtos.ResultOrException.Builder resultOrExceptionOrBuilder = 2793 ClientProtos.ResultOrException.newBuilder(); 2794 if (regionAction.getActionCount() == 1) { 2795 CheckAndMutateResult result = checkAndMutate(region, quota, 2796 regionAction.getAction(0).getMutation(), cellScanner, 2797 regionAction.getCondition(), spaceQuotaEnforcement); 2798 regionActionResultBuilder.setProcessed(result.isSuccess()); 2799 resultOrExceptionOrBuilder.setIndex(0); 2800 if (result.getResult() != null) { 2801 resultOrExceptionOrBuilder.setResult(ProtobufUtil.toResult(result.getResult())); 2802 } 2803 regionActionResultBuilder.addResultOrException(resultOrExceptionOrBuilder.build()); 2804 } else { 2805 CheckAndMutateResult result = checkAndMutate(region, regionAction.getActionList(), 2806 cellScanner, regionAction.getCondition(), spaceQuotaEnforcement); 2807 regionActionResultBuilder.setProcessed(result.isSuccess()); 2808 for (int i = 0; i < regionAction.getActionCount(); i++) { 2809 if (i == 0 && result.getResult() != null) { 2810 // Set the result of the Increment/Append operations to the first element of the 2811 // ResultOrException list 2812 resultOrExceptionOrBuilder.setIndex(i); 2813 regionActionResultBuilder.addResultOrException(resultOrExceptionOrBuilder 2814 .setResult(ProtobufUtil.toResult(result.getResult())).build()); 2815 continue; 2816 } 2817 // To unify the response format with doNonAtomicRegionMutation and read through 2818 // client's AsyncProcess we have to add an empty result instance per operation 2819 resultOrExceptionOrBuilder.clear(); 2820 resultOrExceptionOrBuilder.setIndex(i); 2821 regionActionResultBuilder.addResultOrException(resultOrExceptionOrBuilder.build()); 2822 } 2823 } 2824 } catch (IOException e) { 2825 rpcServer.getMetrics().exception(e); 2826 // As it's an atomic operation with a condition, we may expect it's a global failure. 2827 regionActionResultBuilder.setException(ResponseConverter.buildException(e)); 2828 } 2829 } else if (regionAction.hasAtomic() && regionAction.getAtomic()) { 2830 try { 2831 doAtomicBatchOp(regionActionResultBuilder, region, quota, regionAction.getActionList(), 2832 cellScanner, spaceQuotaEnforcement); 2833 regionActionResultBuilder.setProcessed(true); 2834 // We no longer use MultiResponse#processed. Instead, we use 2835 // RegionActionResult#processed. This is for backward compatibility for old clients. 2836 responseBuilder.setProcessed(true); 2837 } catch (IOException e) { 2838 rpcServer.getMetrics().exception(e); 2839 // As it's atomic, we may expect it's a global failure. 2840 regionActionResultBuilder.setException(ResponseConverter.buildException(e)); 2841 } 2842 } else { 2843 // doNonAtomicRegionMutation manages the exception internally 2844 if (context != null && closeCallBack == null) { 2845 // An RpcCallBack that creates a list of scanners that needs to perform callBack 2846 // operation on completion of multiGets. 2847 // Set this only once 2848 closeCallBack = new RegionScannersCloseCallBack(); 2849 context.setCallBack(closeCallBack); 2850 } 2851 cellsToReturn = doNonAtomicRegionMutation(region, quota, regionAction, cellScanner, 2852 regionActionResultBuilder, cellsToReturn, nonceGroup, closeCallBack, context, 2853 spaceQuotaEnforcement); 2854 } 2855 } finally { 2856 quota.close(); 2857 } 2858 2859 responseBuilder.addRegionActionResult(regionActionResultBuilder.build()); 2860 ClientProtos.RegionLoadStats regionLoadStats = region.getLoadStatistics(); 2861 if(regionLoadStats != null) { 2862 regionStats.put(regionSpecifier, regionLoadStats); 2863 } 2864 } 2865 // Load the controller with the Cells to return. 2866 if (cellsToReturn != null && !cellsToReturn.isEmpty() && controller != null) { 2867 controller.setCellScanner(CellUtil.createCellScanner(cellsToReturn)); 2868 } 2869 2870 MultiRegionLoadStats.Builder builder = MultiRegionLoadStats.newBuilder(); 2871 for(Entry<RegionSpecifier, ClientProtos.RegionLoadStats> stat: regionStats.entrySet()){ 2872 builder.addRegion(stat.getKey()); 2873 builder.addStat(stat.getValue()); 2874 } 2875 responseBuilder.setRegionStatistics(builder); 2876 return responseBuilder.build(); 2877 } 2878 2879 private void skipCellsForMutations(List<Action> actions, CellScanner cellScanner) { 2880 if (cellScanner == null) { 2881 return; 2882 } 2883 for (Action action : actions) { 2884 skipCellsForMutation(action, cellScanner); 2885 } 2886 } 2887 2888 private void skipCellsForMutation(Action action, CellScanner cellScanner) { 2889 if (cellScanner == null) { 2890 return; 2891 } 2892 try { 2893 if (action.hasMutation()) { 2894 MutationProto m = action.getMutation(); 2895 if (m.hasAssociatedCellCount()) { 2896 for (int i = 0; i < m.getAssociatedCellCount(); i++) { 2897 cellScanner.advance(); 2898 } 2899 } 2900 } 2901 } catch (IOException e) { 2902 // No need to handle these Individual Muatation level issue. Any way this entire RegionAction 2903 // marked as failed as we could not see the Region here. At client side the top level 2904 // RegionAction exception will be considered first. 2905 LOG.error("Error while skipping Cells in CellScanner for invalid Region Mutations", e); 2906 } 2907 } 2908 2909 /** 2910 * Mutate data in a table. 2911 * 2912 * @param rpcc the RPC controller 2913 * @param request the mutate request 2914 */ 2915 @Override 2916 public MutateResponse mutate(final RpcController rpcc, 2917 final MutateRequest request) throws ServiceException { 2918 // rpc controller is how we bring in data via the back door; it is unprotobuf'ed data. 2919 // It is also the conduit via which we pass back data. 2920 HBaseRpcController controller = (HBaseRpcController)rpcc; 2921 CellScanner cellScanner = controller != null ? controller.cellScanner() : null; 2922 OperationQuota quota = null; 2923 RpcCallContext context = RpcServer.getCurrentCall().orElse(null); 2924 // Clear scanner so we are not holding on to reference across call. 2925 if (controller != null) { 2926 controller.setCellScanner(null); 2927 } 2928 try { 2929 checkOpen(); 2930 requestCount.increment(); 2931 rpcMutateRequestCount.increment(); 2932 HRegion region = getRegion(request.getRegion()); 2933 MutateResponse.Builder builder = MutateResponse.newBuilder(); 2934 MutationProto mutation = request.getMutation(); 2935 if (!region.getRegionInfo().isMetaRegion()) { 2936 regionServer.getMemStoreFlusher().reclaimMemStoreMemory(); 2937 } 2938 long nonceGroup = request.hasNonceGroup() ? request.getNonceGroup() : HConstants.NO_NONCE; 2939 quota = getRpcQuotaManager().checkQuota(region, OperationQuota.OperationType.MUTATE); 2940 ActivePolicyEnforcement spaceQuotaEnforcement = getSpaceQuotaManager() 2941 .getActiveEnforcements(); 2942 2943 if (request.hasCondition()) { 2944 CheckAndMutateResult result = checkAndMutate(region, quota, mutation, cellScanner, 2945 request.getCondition(), spaceQuotaEnforcement); 2946 builder.setProcessed(result.isSuccess()); 2947 boolean clientCellBlockSupported = isClientCellBlockSupport(context); 2948 addResult(builder, result.getResult(), controller, clientCellBlockSupported); 2949 if (clientCellBlockSupported) { 2950 addSize(context, result.getResult(), null); 2951 } 2952 } else { 2953 Result r = null; 2954 Boolean processed = null; 2955 MutationType type = mutation.getMutateType(); 2956 switch (type) { 2957 case APPEND: 2958 // TODO: this doesn't actually check anything. 2959 r = append(region, quota, mutation, cellScanner, nonceGroup, spaceQuotaEnforcement); 2960 break; 2961 case INCREMENT: 2962 // TODO: this doesn't actually check anything. 2963 r = increment(region, quota, mutation, cellScanner, nonceGroup, spaceQuotaEnforcement); 2964 break; 2965 case PUT: 2966 put(region, quota, mutation, cellScanner, spaceQuotaEnforcement); 2967 processed = Boolean.TRUE; 2968 break; 2969 case DELETE: 2970 delete(region, quota, mutation, cellScanner, spaceQuotaEnforcement); 2971 processed = Boolean.TRUE; 2972 break; 2973 default: 2974 throw new DoNotRetryIOException("Unsupported mutate type: " + type.name()); 2975 } 2976 if (processed != null) { 2977 builder.setProcessed(processed); 2978 } 2979 boolean clientCellBlockSupported = isClientCellBlockSupport(context); 2980 addResult(builder, r, controller, clientCellBlockSupported); 2981 if (clientCellBlockSupported) { 2982 addSize(context, r, null); 2983 } 2984 } 2985 return builder.build(); 2986 } catch (IOException ie) { 2987 regionServer.checkFileSystem(); 2988 throw new ServiceException(ie); 2989 } finally { 2990 if (quota != null) { 2991 quota.close(); 2992 } 2993 } 2994 } 2995 2996 private void put(HRegion region, OperationQuota quota, MutationProto mutation, 2997 CellScanner cellScanner, ActivePolicyEnforcement spaceQuota) throws IOException { 2998 long before = EnvironmentEdgeManager.currentTime(); 2999 Put put = ProtobufUtil.toPut(mutation, cellScanner); 3000 checkCellSizeLimit(region, put); 3001 spaceQuota.getPolicyEnforcement(region).check(put); 3002 quota.addMutation(put); 3003 region.put(put); 3004 3005 MetricsRegionServer metricsRegionServer = regionServer.getMetrics(); 3006 if (metricsRegionServer != null) { 3007 long after = EnvironmentEdgeManager.currentTime(); 3008 metricsRegionServer.updatePut(region.getRegionInfo().getTable(), after - before); 3009 } 3010 } 3011 3012 private void delete(HRegion region, OperationQuota quota, MutationProto mutation, 3013 CellScanner cellScanner, ActivePolicyEnforcement spaceQuota) throws IOException { 3014 long before = EnvironmentEdgeManager.currentTime(); 3015 Delete delete = ProtobufUtil.toDelete(mutation, cellScanner); 3016 checkCellSizeLimit(region, delete); 3017 spaceQuota.getPolicyEnforcement(region).check(delete); 3018 quota.addMutation(delete); 3019 region.delete(delete); 3020 3021 MetricsRegionServer metricsRegionServer = regionServer.getMetrics(); 3022 if (metricsRegionServer != null) { 3023 long after = EnvironmentEdgeManager.currentTime(); 3024 metricsRegionServer.updateDelete(region.getRegionInfo().getTable(), after - before); 3025 } 3026 } 3027 3028 private CheckAndMutateResult checkAndMutate(HRegion region, OperationQuota quota, 3029 MutationProto mutation, CellScanner cellScanner, Condition condition, 3030 ActivePolicyEnforcement spaceQuota) throws IOException { 3031 long before = EnvironmentEdgeManager.currentTime(); 3032 CheckAndMutate checkAndMutate = ProtobufUtil.toCheckAndMutate(condition, mutation, 3033 cellScanner); 3034 checkCellSizeLimit(region, (Mutation) checkAndMutate.getAction()); 3035 spaceQuota.getPolicyEnforcement(region).check((Mutation) checkAndMutate.getAction()); 3036 quota.addMutation((Mutation) checkAndMutate.getAction()); 3037 3038 CheckAndMutateResult result = null; 3039 if (region.getCoprocessorHost() != null) { 3040 result = region.getCoprocessorHost().preCheckAndMutate(checkAndMutate); 3041 } 3042 if (result == null) { 3043 result = region.checkAndMutate(checkAndMutate); 3044 if (region.getCoprocessorHost() != null) { 3045 result = region.getCoprocessorHost().postCheckAndMutate(checkAndMutate, result); 3046 } 3047 } 3048 MetricsRegionServer metricsRegionServer = regionServer.getMetrics(); 3049 if (metricsRegionServer != null) { 3050 long after = EnvironmentEdgeManager.currentTime(); 3051 metricsRegionServer.updateCheckAndMutate( 3052 region.getRegionInfo().getTable(), after - before); 3053 3054 MutationType type = mutation.getMutateType(); 3055 switch (type) { 3056 case PUT: 3057 metricsRegionServer.updateCheckAndPut( 3058 region.getRegionInfo().getTable(), after - before); 3059 break; 3060 case DELETE: 3061 metricsRegionServer.updateCheckAndDelete( 3062 region.getRegionInfo().getTable(), after - before); 3063 break; 3064 default: 3065 break; 3066 } 3067 } 3068 return result; 3069 } 3070 3071 // This is used to keep compatible with the old client implementation. Consider remove it if we 3072 // decide to drop the support of the client that still sends close request to a region scanner 3073 // which has already been exhausted. 3074 @Deprecated 3075 private static final IOException SCANNER_ALREADY_CLOSED = new IOException() { 3076 3077 private static final long serialVersionUID = -4305297078988180130L; 3078 3079 @Override 3080 public synchronized Throwable fillInStackTrace() { 3081 return this; 3082 } 3083 }; 3084 3085 private RegionScannerHolder getRegionScanner(ScanRequest request) throws IOException { 3086 String scannerName = Long.toString(request.getScannerId()); 3087 RegionScannerHolder rsh = scanners.get(scannerName); 3088 if (rsh == null) { 3089 // just ignore the next or close request if scanner does not exists. 3090 if (closedScanners.getIfPresent(scannerName) != null) { 3091 throw SCANNER_ALREADY_CLOSED; 3092 } else { 3093 LOG.warn("Client tried to access missing scanner " + scannerName); 3094 throw new UnknownScannerException( 3095 "Unknown scanner '" + scannerName + "'. This can happen due to any of the following " + 3096 "reasons: a) Scanner id given is wrong, b) Scanner lease expired because of " + 3097 "long wait between consecutive client checkins, c) Server may be closing down, " + 3098 "d) RegionServer restart during upgrade.\nIf the issue is due to reason (b), a " + 3099 "possible fix would be increasing the value of" + 3100 "'hbase.client.scanner.timeout.period' configuration."); 3101 } 3102 } 3103 RegionInfo hri = rsh.s.getRegionInfo(); 3104 // Yes, should be the same instance 3105 if (regionServer.getOnlineRegion(hri.getRegionName()) != rsh.r) { 3106 String msg = "Region has changed on the scanner " + scannerName + ": regionName=" 3107 + hri.getRegionNameAsString() + ", scannerRegionName=" + rsh.r; 3108 LOG.warn(msg + ", closing..."); 3109 scanners.remove(scannerName); 3110 try { 3111 rsh.s.close(); 3112 } catch (IOException e) { 3113 LOG.warn("Getting exception closing " + scannerName, e); 3114 } finally { 3115 try { 3116 regionServer.getLeaseManager().cancelLease(scannerName); 3117 } catch (LeaseException e) { 3118 LOG.warn("Getting exception closing " + scannerName, e); 3119 } 3120 } 3121 throw new NotServingRegionException(msg); 3122 } 3123 return rsh; 3124 } 3125 3126 private RegionScannerHolder newRegionScanner(ScanRequest request, ScanResponse.Builder builder) 3127 throws IOException { 3128 HRegion region = getRegion(request.getRegion()); 3129 ClientProtos.Scan protoScan = request.getScan(); 3130 boolean isLoadingCfsOnDemandSet = protoScan.hasLoadColumnFamiliesOnDemand(); 3131 Scan scan = ProtobufUtil.toScan(protoScan); 3132 // if the request doesn't set this, get the default region setting. 3133 if (!isLoadingCfsOnDemandSet) { 3134 scan.setLoadColumnFamiliesOnDemand(region.isLoadingCfsOnDemandDefault()); 3135 } 3136 3137 if (!scan.hasFamilies()) { 3138 // Adding all families to scanner 3139 for (byte[] family : region.getTableDescriptor().getColumnFamilyNames()) { 3140 scan.addFamily(family); 3141 } 3142 } 3143 if (region.getCoprocessorHost() != null) { 3144 // preScannerOpen is not allowed to return a RegionScanner. Only post hook can create a 3145 // wrapper for the core created RegionScanner 3146 region.getCoprocessorHost().preScannerOpen(scan); 3147 } 3148 RegionScannerImpl coreScanner = region.getScanner(scan); 3149 Shipper shipper = coreScanner; 3150 RegionScanner scanner = coreScanner; 3151 if (region.getCoprocessorHost() != null) { 3152 scanner = region.getCoprocessorHost().postScannerOpen(scan, scanner); 3153 } 3154 long scannerId = scannerIdGenerator.generateNewScannerId(); 3155 builder.setScannerId(scannerId); 3156 builder.setMvccReadPoint(scanner.getMvccReadPoint()); 3157 builder.setTtl(scannerLeaseTimeoutPeriod); 3158 String scannerName = String.valueOf(scannerId); 3159 3160 boolean fullRegionScan = !region.getRegionInfo().getTable().isSystemTable() && 3161 isFullRegionScan(scan, region); 3162 3163 return addScanner(scannerName, scanner, shipper, region, scan.isNeedCursorResult(), 3164 fullRegionScan); 3165 } 3166 3167 private void checkScanNextCallSeq(ScanRequest request, RegionScannerHolder rsh) 3168 throws OutOfOrderScannerNextException { 3169 // if nextCallSeq does not match throw Exception straight away. This needs to be 3170 // performed even before checking of Lease. 3171 // See HBASE-5974 3172 if (request.hasNextCallSeq()) { 3173 long callSeq = request.getNextCallSeq(); 3174 if (!rsh.incNextCallSeq(callSeq)) { 3175 throw new OutOfOrderScannerNextException("Expected nextCallSeq: " + rsh.getNextCallSeq() 3176 + " But the nextCallSeq got from client: " + request.getNextCallSeq() + "; request=" 3177 + TextFormat.shortDebugString(request)); 3178 } 3179 } 3180 } 3181 3182 private void addScannerLeaseBack(LeaseManager.Lease lease) { 3183 try { 3184 regionServer.getLeaseManager().addLease(lease); 3185 } catch (LeaseStillHeldException e) { 3186 // should not happen as the scanner id is unique. 3187 throw new AssertionError(e); 3188 } 3189 } 3190 3191 private long getTimeLimit(HBaseRpcController controller, boolean allowHeartbeatMessages) { 3192 // Set the time limit to be half of the more restrictive timeout value (one of the 3193 // timeout values must be positive). In the event that both values are positive, the 3194 // more restrictive of the two is used to calculate the limit. 3195 if (allowHeartbeatMessages && (scannerLeaseTimeoutPeriod > 0 || rpcTimeout > 0)) { 3196 long timeLimitDelta; 3197 if (scannerLeaseTimeoutPeriod > 0 && rpcTimeout > 0) { 3198 timeLimitDelta = Math.min(scannerLeaseTimeoutPeriod, rpcTimeout); 3199 } else { 3200 timeLimitDelta = scannerLeaseTimeoutPeriod > 0 ? scannerLeaseTimeoutPeriod : rpcTimeout; 3201 } 3202 if (controller != null && controller.getCallTimeout() > 0) { 3203 timeLimitDelta = Math.min(timeLimitDelta, controller.getCallTimeout()); 3204 } 3205 // Use half of whichever timeout value was more restrictive... But don't allow 3206 // the time limit to be less than the allowable minimum (could cause an 3207 // immediatate timeout before scanning any data). 3208 timeLimitDelta = Math.max(timeLimitDelta / 2, minimumScanTimeLimitDelta); 3209 // XXX: Can not use EnvironmentEdge here because TestIncrementTimeRange use a 3210 // ManualEnvironmentEdge. Consider using System.nanoTime instead. 3211 return System.currentTimeMillis() + timeLimitDelta; 3212 } 3213 // Default value of timeLimit is negative to indicate no timeLimit should be 3214 // enforced. 3215 return -1L; 3216 } 3217 3218 private void checkLimitOfRows(int numOfCompleteRows, int limitOfRows, boolean moreRows, 3219 ScannerContext scannerContext, ScanResponse.Builder builder) { 3220 if (numOfCompleteRows >= limitOfRows) { 3221 if (LOG.isTraceEnabled()) { 3222 LOG.trace("Done scanning, limit of rows reached, moreRows: " + moreRows + 3223 " scannerContext: " + scannerContext); 3224 } 3225 builder.setMoreResults(false); 3226 } 3227 } 3228 3229 // return whether we have more results in region. 3230 private void scan(HBaseRpcController controller, ScanRequest request, RegionScannerHolder rsh, 3231 long maxQuotaResultSize, int maxResults, int limitOfRows, List<Result> results, 3232 ScanResponse.Builder builder, MutableObject<Object> lastBlock, RpcCallContext context) 3233 throws IOException { 3234 HRegion region = rsh.r; 3235 RegionScanner scanner = rsh.s; 3236 long maxResultSize; 3237 if (scanner.getMaxResultSize() > 0) { 3238 maxResultSize = Math.min(scanner.getMaxResultSize(), maxQuotaResultSize); 3239 } else { 3240 maxResultSize = maxQuotaResultSize; 3241 } 3242 // This is cells inside a row. Default size is 10 so if many versions or many cfs, 3243 // then we'll resize. Resizings show in profiler. Set it higher than 10. For now 3244 // arbitrary 32. TODO: keep record of general size of results being returned. 3245 List<Cell> values = new ArrayList<>(32); 3246 region.startRegionOperation(Operation.SCAN); 3247 try { 3248 int numOfResults = 0; 3249 int numOfCompleteRows = 0; 3250 long before = EnvironmentEdgeManager.currentTime(); 3251 synchronized (scanner) { 3252 boolean stale = (region.getRegionInfo().getReplicaId() != 0); 3253 boolean clientHandlesPartials = 3254 request.hasClientHandlesPartials() && request.getClientHandlesPartials(); 3255 boolean clientHandlesHeartbeats = 3256 request.hasClientHandlesHeartbeats() && request.getClientHandlesHeartbeats(); 3257 3258 // On the server side we must ensure that the correct ordering of partial results is 3259 // returned to the client to allow them to properly reconstruct the partial results. 3260 // If the coprocessor host is adding to the result list, we cannot guarantee the 3261 // correct ordering of partial results and so we prevent partial results from being 3262 // formed. 3263 boolean serverGuaranteesOrderOfPartials = results.isEmpty(); 3264 boolean allowPartialResults = clientHandlesPartials && serverGuaranteesOrderOfPartials; 3265 boolean moreRows = false; 3266 3267 // Heartbeat messages occur when the processing of the ScanRequest is exceeds a 3268 // certain time threshold on the server. When the time threshold is exceeded, the 3269 // server stops the scan and sends back whatever Results it has accumulated within 3270 // that time period (may be empty). Since heartbeat messages have the potential to 3271 // create partial Results (in the event that the timeout occurs in the middle of a 3272 // row), we must only generate heartbeat messages when the client can handle both 3273 // heartbeats AND partials 3274 boolean allowHeartbeatMessages = clientHandlesHeartbeats && allowPartialResults; 3275 3276 long timeLimit = getTimeLimit(controller, allowHeartbeatMessages); 3277 3278 final LimitScope sizeScope = 3279 allowPartialResults ? LimitScope.BETWEEN_CELLS : LimitScope.BETWEEN_ROWS; 3280 final LimitScope timeScope = 3281 allowHeartbeatMessages ? LimitScope.BETWEEN_CELLS : LimitScope.BETWEEN_ROWS; 3282 3283 boolean trackMetrics = request.hasTrackScanMetrics() && request.getTrackScanMetrics(); 3284 3285 // Configure with limits for this RPC. Set keep progress true since size progress 3286 // towards size limit should be kept between calls to nextRaw 3287 ScannerContext.Builder contextBuilder = ScannerContext.newBuilder(true); 3288 // maxResultSize - either we can reach this much size for all cells(being read) data or sum 3289 // of heap size occupied by cells(being read). Cell data means its key and value parts. 3290 contextBuilder.setSizeLimit(sizeScope, maxResultSize, maxResultSize); 3291 contextBuilder.setBatchLimit(scanner.getBatch()); 3292 contextBuilder.setTimeLimit(timeScope, timeLimit); 3293 contextBuilder.setTrackMetrics(trackMetrics); 3294 ScannerContext scannerContext = contextBuilder.build(); 3295 boolean limitReached = false; 3296 while (numOfResults < maxResults) { 3297 // Reset the batch progress to 0 before every call to RegionScanner#nextRaw. The 3298 // batch limit is a limit on the number of cells per Result. Thus, if progress is 3299 // being tracked (i.e. scannerContext.keepProgress() is true) then we need to 3300 // reset the batch progress between nextRaw invocations since we don't want the 3301 // batch progress from previous calls to affect future calls 3302 scannerContext.setBatchProgress(0); 3303 3304 // Collect values to be returned here 3305 moreRows = scanner.nextRaw(values, scannerContext); 3306 3307 if (!values.isEmpty()) { 3308 if (limitOfRows > 0) { 3309 // First we need to check if the last result is partial and we have a row change. If 3310 // so then we need to increase the numOfCompleteRows. 3311 if (results.isEmpty()) { 3312 if (rsh.rowOfLastPartialResult != null && 3313 !CellUtil.matchingRows(values.get(0), rsh.rowOfLastPartialResult)) { 3314 numOfCompleteRows++; 3315 checkLimitOfRows(numOfCompleteRows, limitOfRows, moreRows, scannerContext, 3316 builder); 3317 } 3318 } else { 3319 Result lastResult = results.get(results.size() - 1); 3320 if (lastResult.mayHaveMoreCellsInRow() && 3321 !CellUtil.matchingRows(values.get(0), lastResult.getRow())) { 3322 numOfCompleteRows++; 3323 checkLimitOfRows(numOfCompleteRows, limitOfRows, moreRows, scannerContext, 3324 builder); 3325 } 3326 } 3327 if (builder.hasMoreResults() && !builder.getMoreResults()) { 3328 break; 3329 } 3330 } 3331 boolean mayHaveMoreCellsInRow = scannerContext.mayHaveMoreCellsInRow(); 3332 Result r = Result.create(values, null, stale, mayHaveMoreCellsInRow); 3333 lastBlock.setValue(addSize(context, r, lastBlock.getValue())); 3334 results.add(r); 3335 numOfResults++; 3336 if (!mayHaveMoreCellsInRow && limitOfRows > 0) { 3337 numOfCompleteRows++; 3338 checkLimitOfRows(numOfCompleteRows, limitOfRows, moreRows, scannerContext, builder); 3339 if (builder.hasMoreResults() && !builder.getMoreResults()) { 3340 break; 3341 } 3342 } 3343 } else if (!moreRows && !results.isEmpty()) { 3344 // No more cells for the scan here, we need to ensure that the mayHaveMoreCellsInRow of 3345 // last result is false. Otherwise it's possible that: the first nextRaw returned 3346 // because BATCH_LIMIT_REACHED (BTW it happen to exhaust all cells of the scan),so the 3347 // last result's mayHaveMoreCellsInRow will be true. while the following nextRaw will 3348 // return with moreRows=false, which means moreResultsInRegion would be false, it will 3349 // be a contradictory state (HBASE-21206). 3350 int lastIdx = results.size() - 1; 3351 Result r = results.get(lastIdx); 3352 if (r.mayHaveMoreCellsInRow()) { 3353 results.set(lastIdx, Result.create(r.rawCells(), r.getExists(), r.isStale(), false)); 3354 } 3355 } 3356 boolean sizeLimitReached = scannerContext.checkSizeLimit(LimitScope.BETWEEN_ROWS); 3357 boolean timeLimitReached = scannerContext.checkTimeLimit(LimitScope.BETWEEN_ROWS); 3358 boolean resultsLimitReached = numOfResults >= maxResults; 3359 limitReached = sizeLimitReached || timeLimitReached || resultsLimitReached; 3360 3361 if (limitReached || !moreRows) { 3362 // We only want to mark a ScanResponse as a heartbeat message in the event that 3363 // there are more values to be read server side. If there aren't more values, 3364 // marking it as a heartbeat is wasteful because the client will need to issue 3365 // another ScanRequest only to realize that they already have all the values 3366 if (moreRows && timeLimitReached) { 3367 // Heartbeat messages occur when the time limit has been reached. 3368 builder.setHeartbeatMessage(true); 3369 if (rsh.needCursor) { 3370 Cell cursorCell = scannerContext.getLastPeekedCell(); 3371 if (cursorCell != null) { 3372 builder.setCursor(ProtobufUtil.toCursor(cursorCell)); 3373 } 3374 } 3375 } 3376 break; 3377 } 3378 values.clear(); 3379 } 3380 builder.setMoreResultsInRegion(moreRows); 3381 // Check to see if the client requested that we track metrics server side. If the 3382 // client requested metrics, retrieve the metrics from the scanner context. 3383 if (trackMetrics) { 3384 Map<String, Long> metrics = scannerContext.getMetrics().getMetricsMap(); 3385 ScanMetrics.Builder metricBuilder = ScanMetrics.newBuilder(); 3386 NameInt64Pair.Builder pairBuilder = NameInt64Pair.newBuilder(); 3387 3388 for (Entry<String, Long> entry : metrics.entrySet()) { 3389 pairBuilder.setName(entry.getKey()); 3390 pairBuilder.setValue(entry.getValue()); 3391 metricBuilder.addMetrics(pairBuilder.build()); 3392 } 3393 3394 builder.setScanMetrics(metricBuilder.build()); 3395 } 3396 } 3397 long end = EnvironmentEdgeManager.currentTime(); 3398 long responseCellSize = context != null ? context.getResponseCellSize() : 0; 3399 region.getMetrics().updateScanTime(end - before); 3400 final MetricsRegionServer metricsRegionServer = regionServer.getMetrics(); 3401 if (metricsRegionServer != null) { 3402 metricsRegionServer.updateScanSize( 3403 region.getTableDescriptor().getTableName(), responseCellSize); 3404 metricsRegionServer.updateScanTime( 3405 region.getTableDescriptor().getTableName(), end - before); 3406 } 3407 } finally { 3408 region.closeRegionOperation(); 3409 } 3410 // coprocessor postNext hook 3411 if (region.getCoprocessorHost() != null) { 3412 region.getCoprocessorHost().postScannerNext(scanner, results, maxResults, true); 3413 } 3414 } 3415 3416 /** 3417 * Scan data in a table. 3418 * 3419 * @param controller the RPC controller 3420 * @param request the scan request 3421 * @throws ServiceException 3422 */ 3423 @Override 3424 public ScanResponse scan(final RpcController controller, final ScanRequest request) 3425 throws ServiceException { 3426 if (controller != null && !(controller instanceof HBaseRpcController)) { 3427 throw new UnsupportedOperationException( 3428 "We only do " + "HBaseRpcControllers! FIX IF A PROBLEM: " + controller); 3429 } 3430 if (!request.hasScannerId() && !request.hasScan()) { 3431 throw new ServiceException( 3432 new DoNotRetryIOException("Missing required input: scannerId or scan")); 3433 } 3434 try { 3435 checkOpen(); 3436 } catch (IOException e) { 3437 if (request.hasScannerId()) { 3438 String scannerName = Long.toString(request.getScannerId()); 3439 if (LOG.isDebugEnabled()) { 3440 LOG.debug( 3441 "Server shutting down and client tried to access missing scanner " + scannerName); 3442 } 3443 final LeaseManager leaseManager = regionServer.getLeaseManager(); 3444 if (leaseManager != null) { 3445 try { 3446 leaseManager.cancelLease(scannerName); 3447 } catch (LeaseException le) { 3448 // No problem, ignore 3449 if (LOG.isTraceEnabled()) { 3450 LOG.trace("Un-able to cancel lease of scanner. It could already be closed."); 3451 } 3452 } 3453 } 3454 } 3455 throw new ServiceException(e); 3456 } 3457 requestCount.increment(); 3458 rpcScanRequestCount.increment(); 3459 RegionScannerHolder rsh; 3460 ScanResponse.Builder builder = ScanResponse.newBuilder(); 3461 try { 3462 if (request.hasScannerId()) { 3463 // The downstream projects such as AsyncHBase in OpenTSDB need this value. See HBASE-18000 3464 // for more details. 3465 builder.setScannerId(request.getScannerId()); 3466 rsh = getRegionScanner(request); 3467 } else { 3468 rsh = newRegionScanner(request, builder); 3469 } 3470 } catch (IOException e) { 3471 if (e == SCANNER_ALREADY_CLOSED) { 3472 // Now we will close scanner automatically if there are no more results for this region but 3473 // the old client will still send a close request to us. Just ignore it and return. 3474 return builder.build(); 3475 } 3476 throw new ServiceException(e); 3477 } 3478 if (rsh.fullRegionScan) { 3479 rpcFullScanRequestCount.increment(); 3480 } 3481 HRegion region = rsh.r; 3482 String scannerName = rsh.scannerName; 3483 LeaseManager.Lease lease; 3484 try { 3485 // Remove lease while its being processed in server; protects against case 3486 // where processing of request takes > lease expiration time. 3487 lease = regionServer.getLeaseManager().removeLease(scannerName); 3488 } catch (LeaseException e) { 3489 throw new ServiceException(e); 3490 } 3491 if (request.hasRenew() && request.getRenew()) { 3492 // add back and return 3493 addScannerLeaseBack(lease); 3494 try { 3495 checkScanNextCallSeq(request, rsh); 3496 } catch (OutOfOrderScannerNextException e) { 3497 throw new ServiceException(e); 3498 } 3499 return builder.build(); 3500 } 3501 OperationQuota quota; 3502 try { 3503 quota = getRpcQuotaManager().checkQuota(region, OperationQuota.OperationType.SCAN); 3504 } catch (IOException e) { 3505 addScannerLeaseBack(lease); 3506 throw new ServiceException(e); 3507 } 3508 try { 3509 checkScanNextCallSeq(request, rsh); 3510 } catch (OutOfOrderScannerNextException e) { 3511 addScannerLeaseBack(lease); 3512 throw new ServiceException(e); 3513 } 3514 // Now we have increased the next call sequence. If we give client an error, the retry will 3515 // never success. So we'd better close the scanner and return a DoNotRetryIOException to client 3516 // and then client will try to open a new scanner. 3517 boolean closeScanner = request.hasCloseScanner() ? request.getCloseScanner() : false; 3518 int rows; // this is scan.getCaching 3519 if (request.hasNumberOfRows()) { 3520 rows = request.getNumberOfRows(); 3521 } else { 3522 rows = closeScanner ? 0 : 1; 3523 } 3524 RpcCallContext context = RpcServer.getCurrentCall().orElse(null); 3525 // now let's do the real scan. 3526 long maxQuotaResultSize = Math.min(maxScannerResultSize, quota.getReadAvailable()); 3527 RegionScanner scanner = rsh.s; 3528 // this is the limit of rows for this scan, if we the number of rows reach this value, we will 3529 // close the scanner. 3530 int limitOfRows; 3531 if (request.hasLimitOfRows()) { 3532 limitOfRows = request.getLimitOfRows(); 3533 } else { 3534 limitOfRows = -1; 3535 } 3536 MutableObject<Object> lastBlock = new MutableObject<>(); 3537 boolean scannerClosed = false; 3538 try { 3539 List<Result> results = new ArrayList<>(Math.min(rows, 512)); 3540 if (rows > 0) { 3541 boolean done = false; 3542 // Call coprocessor. Get region info from scanner. 3543 if (region.getCoprocessorHost() != null) { 3544 Boolean bypass = region.getCoprocessorHost().preScannerNext(scanner, results, rows); 3545 if (!results.isEmpty()) { 3546 for (Result r : results) { 3547 lastBlock.setValue(addSize(context, r, lastBlock.getValue())); 3548 } 3549 } 3550 if (bypass != null && bypass.booleanValue()) { 3551 done = true; 3552 } 3553 } 3554 if (!done) { 3555 scan((HBaseRpcController) controller, request, rsh, maxQuotaResultSize, rows, limitOfRows, 3556 results, builder, lastBlock, context); 3557 } else { 3558 builder.setMoreResultsInRegion(!results.isEmpty()); 3559 } 3560 } else { 3561 // This is a open scanner call with numberOfRow = 0, so set more results in region to true. 3562 builder.setMoreResultsInRegion(true); 3563 } 3564 3565 quota.addScanResult(results); 3566 addResults(builder, results, (HBaseRpcController) controller, 3567 RegionReplicaUtil.isDefaultReplica(region.getRegionInfo()), 3568 isClientCellBlockSupport(context)); 3569 if (scanner.isFilterDone() && results.isEmpty()) { 3570 // If the scanner's filter - if any - is done with the scan 3571 // only set moreResults to false if the results is empty. This is used to keep compatible 3572 // with the old scan implementation where we just ignore the returned results if moreResults 3573 // is false. Can remove the isEmpty check after we get rid of the old implementation. 3574 builder.setMoreResults(false); 3575 } 3576 // Later we may close the scanner depending on this flag so here we need to make sure that we 3577 // have already set this flag. 3578 assert builder.hasMoreResultsInRegion(); 3579 // we only set moreResults to false in the above code, so set it to true if we haven't set it 3580 // yet. 3581 if (!builder.hasMoreResults()) { 3582 builder.setMoreResults(true); 3583 } 3584 if (builder.getMoreResults() && builder.getMoreResultsInRegion() && !results.isEmpty()) { 3585 // Record the last cell of the last result if it is a partial result 3586 // We need this to calculate the complete rows we have returned to client as the 3587 // mayHaveMoreCellsInRow is true does not mean that there will be extra cells for the 3588 // current row. We may filter out all the remaining cells for the current row and just 3589 // return the cells of the nextRow when calling RegionScanner.nextRaw. So here we need to 3590 // check for row change. 3591 Result lastResult = results.get(results.size() - 1); 3592 if (lastResult.mayHaveMoreCellsInRow()) { 3593 rsh.rowOfLastPartialResult = lastResult.getRow(); 3594 } else { 3595 rsh.rowOfLastPartialResult = null; 3596 } 3597 } 3598 if (!builder.getMoreResults() || !builder.getMoreResultsInRegion() || closeScanner) { 3599 scannerClosed = true; 3600 closeScanner(region, scanner, scannerName, context); 3601 } 3602 return builder.build(); 3603 } catch (IOException e) { 3604 try { 3605 // scanner is closed here 3606 scannerClosed = true; 3607 // The scanner state might be left in a dirty state, so we will tell the Client to 3608 // fail this RPC and close the scanner while opening up another one from the start of 3609 // row that the client has last seen. 3610 closeScanner(region, scanner, scannerName, context); 3611 3612 // If it is a DoNotRetryIOException already, throw as it is. Unfortunately, DNRIOE is 3613 // used in two different semantics. 3614 // (1) The first is to close the client scanner and bubble up the exception all the way 3615 // to the application. This is preferred when the exception is really un-recoverable 3616 // (like CorruptHFileException, etc). Plain DoNotRetryIOException also falls into this 3617 // bucket usually. 3618 // (2) Second semantics is to close the current region scanner only, but continue the 3619 // client scanner by overriding the exception. This is usually UnknownScannerException, 3620 // OutOfOrderScannerNextException, etc where the region scanner has to be closed, but the 3621 // application-level ClientScanner has to continue without bubbling up the exception to 3622 // the client. See ClientScanner code to see how it deals with these special exceptions. 3623 if (e instanceof DoNotRetryIOException) { 3624 throw e; 3625 } 3626 3627 // If it is a FileNotFoundException, wrap as a 3628 // DoNotRetryIOException. This can avoid the retry in ClientScanner. 3629 if (e instanceof FileNotFoundException) { 3630 throw new DoNotRetryIOException(e); 3631 } 3632 3633 // We closed the scanner already. Instead of throwing the IOException, and client 3634 // retrying with the same scannerId only to get USE on the next RPC, we directly throw 3635 // a special exception to save an RPC. 3636 if (VersionInfoUtil.hasMinimumVersion(context.getClientVersionInfo(), 1, 4)) { 3637 // 1.4.0+ clients know how to handle 3638 throw new ScannerResetException("Scanner is closed on the server-side", e); 3639 } else { 3640 // older clients do not know about SRE. Just throw USE, which they will handle 3641 throw new UnknownScannerException("Throwing UnknownScannerException to reset the client" 3642 + " scanner state for clients older than 1.3.", e); 3643 } 3644 } catch (IOException ioe) { 3645 throw new ServiceException(ioe); 3646 } 3647 } finally { 3648 if (!scannerClosed) { 3649 // Adding resets expiration time on lease. 3650 // the closeCallBack will be set in closeScanner so here we only care about shippedCallback 3651 if (context != null) { 3652 context.setCallBack(rsh.shippedCallback); 3653 } else { 3654 // When context != null, adding back the lease will be done in callback set above. 3655 addScannerLeaseBack(lease); 3656 } 3657 } 3658 quota.close(); 3659 } 3660 } 3661 3662 private void closeScanner(HRegion region, RegionScanner scanner, String scannerName, 3663 RpcCallContext context) throws IOException { 3664 if (region.getCoprocessorHost() != null) { 3665 if (region.getCoprocessorHost().preScannerClose(scanner)) { 3666 // bypass the actual close. 3667 return; 3668 } 3669 } 3670 RegionScannerHolder rsh = scanners.remove(scannerName); 3671 if (rsh != null) { 3672 if (context != null) { 3673 context.setCallBack(rsh.closeCallBack); 3674 } else { 3675 rsh.s.close(); 3676 } 3677 if (region.getCoprocessorHost() != null) { 3678 region.getCoprocessorHost().postScannerClose(scanner); 3679 } 3680 closedScanners.put(scannerName, scannerName); 3681 } 3682 } 3683 3684 @Override 3685 public CoprocessorServiceResponse execRegionServerService(RpcController controller, 3686 CoprocessorServiceRequest request) throws ServiceException { 3687 rpcPreCheck("execRegionServerService"); 3688 return regionServer.execRegionServerService(controller, request); 3689 } 3690 3691 @Override 3692 public UpdateConfigurationResponse updateConfiguration( 3693 RpcController controller, UpdateConfigurationRequest request) 3694 throws ServiceException { 3695 try { 3696 this.regionServer.updateConfiguration(); 3697 } catch (Exception e) { 3698 throw new ServiceException(e); 3699 } 3700 return UpdateConfigurationResponse.getDefaultInstance(); 3701 } 3702 3703 @Override 3704 public GetSpaceQuotaSnapshotsResponse getSpaceQuotaSnapshots( 3705 RpcController controller, GetSpaceQuotaSnapshotsRequest request) throws ServiceException { 3706 try { 3707 final RegionServerSpaceQuotaManager manager = 3708 regionServer.getRegionServerSpaceQuotaManager(); 3709 final GetSpaceQuotaSnapshotsResponse.Builder builder = 3710 GetSpaceQuotaSnapshotsResponse.newBuilder(); 3711 if (manager != null) { 3712 final Map<TableName,SpaceQuotaSnapshot> snapshots = manager.copyQuotaSnapshots(); 3713 for (Entry<TableName,SpaceQuotaSnapshot> snapshot : snapshots.entrySet()) { 3714 builder.addSnapshots(TableQuotaSnapshot.newBuilder() 3715 .setTableName(ProtobufUtil.toProtoTableName(snapshot.getKey())) 3716 .setSnapshot(SpaceQuotaSnapshot.toProtoSnapshot(snapshot.getValue())) 3717 .build()); 3718 } 3719 } 3720 return builder.build(); 3721 } catch (Exception e) { 3722 throw new ServiceException(e); 3723 } 3724 } 3725 3726 @Override 3727 public ClearRegionBlockCacheResponse clearRegionBlockCache(RpcController controller, 3728 ClearRegionBlockCacheRequest request) { 3729 ClearRegionBlockCacheResponse.Builder builder = 3730 ClearRegionBlockCacheResponse.newBuilder(); 3731 CacheEvictionStatsBuilder stats = CacheEvictionStats.builder(); 3732 List<HRegion> regions = getRegions(request.getRegionList(), stats); 3733 for (HRegion region : regions) { 3734 try { 3735 stats = stats.append(this.regionServer.clearRegionBlockCache(region)); 3736 } catch (Exception e) { 3737 stats.addException(region.getRegionInfo().getRegionName(), e); 3738 } 3739 } 3740 stats.withMaxCacheSize(regionServer.getBlockCache().map(BlockCache::getMaxSize).orElse(0L)); 3741 return builder.setStats(ProtobufUtil.toCacheEvictionStats(stats.build())).build(); 3742 } 3743 3744 private void executeOpenRegionProcedures(OpenRegionRequest request, 3745 Map<TableName, TableDescriptor> tdCache) { 3746 long masterSystemTime = request.hasMasterSystemTime() ? request.getMasterSystemTime() : -1; 3747 for (RegionOpenInfo regionOpenInfo : request.getOpenInfoList()) { 3748 RegionInfo regionInfo = ProtobufUtil.toRegionInfo(regionOpenInfo.getRegion()); 3749 TableDescriptor tableDesc = tdCache.get(regionInfo.getTable()); 3750 if (tableDesc == null) { 3751 try { 3752 tableDesc = regionServer.getTableDescriptors().get(regionInfo.getTable()); 3753 } catch (IOException e) { 3754 // Here we do not fail the whole method since we also need deal with other 3755 // procedures, and we can not ignore this one, so we still schedule a 3756 // AssignRegionHandler and it will report back to master if we still can not get the 3757 // TableDescriptor. 3758 LOG.warn("Failed to get TableDescriptor of {}, will try again in the handler", 3759 regionInfo.getTable(), e); 3760 } 3761 } 3762 if (regionOpenInfo.getFavoredNodesCount() > 0) { 3763 regionServer.updateRegionFavoredNodesMapping(regionInfo.getEncodedName(), 3764 regionOpenInfo.getFavoredNodesList()); 3765 } 3766 long procId = regionOpenInfo.getOpenProcId(); 3767 if (regionServer.submitRegionProcedure(procId)) { 3768 regionServer.executorService.submit(AssignRegionHandler 3769 .create(regionServer, regionInfo, procId, tableDesc, 3770 masterSystemTime)); 3771 } 3772 } 3773 } 3774 3775 private void executeCloseRegionProcedures(CloseRegionRequest request) { 3776 String encodedName; 3777 try { 3778 encodedName = ProtobufUtil.getRegionEncodedName(request.getRegion()); 3779 } catch (DoNotRetryIOException e) { 3780 throw new UncheckedIOException("Should not happen", e); 3781 } 3782 ServerName destination = request.hasDestinationServer() ? 3783 ProtobufUtil.toServerName(request.getDestinationServer()) : 3784 null; 3785 long procId = request.getCloseProcId(); 3786 if (regionServer.submitRegionProcedure(procId)) { 3787 regionServer.executorService.submit(UnassignRegionHandler 3788 .create(regionServer, encodedName, procId, false, destination)); 3789 } 3790 } 3791 3792 private void executeProcedures(RemoteProcedureRequest request) { 3793 RSProcedureCallable callable; 3794 try { 3795 callable = Class.forName(request.getProcClass()).asSubclass(RSProcedureCallable.class) 3796 .getDeclaredConstructor().newInstance(); 3797 } catch (Exception e) { 3798 LOG.warn("Failed to instantiating remote procedure {}, pid={}", request.getProcClass(), 3799 request.getProcId(), e); 3800 regionServer.remoteProcedureComplete(request.getProcId(), e); 3801 return; 3802 } 3803 callable.init(request.getProcData().toByteArray(), regionServer); 3804 LOG.debug("Executing remote procedure {}, pid={}", callable.getClass(), request.getProcId()); 3805 regionServer.executeProcedure(request.getProcId(), callable); 3806 } 3807 3808 @Override 3809 @QosPriority(priority = HConstants.ADMIN_QOS) 3810 public ExecuteProceduresResponse executeProcedures(RpcController controller, 3811 ExecuteProceduresRequest request) throws ServiceException { 3812 try { 3813 checkOpen(); 3814 throwOnWrongStartCode(request); 3815 regionServer.getRegionServerCoprocessorHost().preExecuteProcedures(); 3816 if (request.getOpenRegionCount() > 0) { 3817 // Avoid reading from the TableDescritor every time(usually it will read from the file 3818 // system) 3819 Map<TableName, TableDescriptor> tdCache = new HashMap<>(); 3820 request.getOpenRegionList().forEach(req -> executeOpenRegionProcedures(req, tdCache)); 3821 } 3822 if (request.getCloseRegionCount() > 0) { 3823 request.getCloseRegionList().forEach(this::executeCloseRegionProcedures); 3824 } 3825 if (request.getProcCount() > 0) { 3826 request.getProcList().forEach(this::executeProcedures); 3827 } 3828 regionServer.getRegionServerCoprocessorHost().postExecuteProcedures(); 3829 return ExecuteProceduresResponse.getDefaultInstance(); 3830 } catch (IOException e) { 3831 throw new ServiceException(e); 3832 } 3833 } 3834 3835 @Override 3836 @QosPriority(priority = HConstants.ADMIN_QOS) 3837 public SlowLogResponses getSlowLogResponses(final RpcController controller, 3838 final SlowLogResponseRequest request) { 3839 final NamedQueueRecorder namedQueueRecorder = 3840 this.regionServer.getNamedQueueRecorder(); 3841 final List<SlowLogPayload> slowLogPayloads = getSlowLogPayloads(request, namedQueueRecorder); 3842 SlowLogResponses slowLogResponses = SlowLogResponses.newBuilder() 3843 .addAllSlowLogPayloads(slowLogPayloads) 3844 .build(); 3845 return slowLogResponses; 3846 } 3847 3848 private List<SlowLogPayload> getSlowLogPayloads(SlowLogResponseRequest request, 3849 NamedQueueRecorder namedQueueRecorder) { 3850 if (namedQueueRecorder == null) { 3851 return Collections.emptyList(); 3852 } 3853 List<SlowLogPayload> slowLogPayloads; 3854 NamedQueueGetRequest namedQueueGetRequest = new NamedQueueGetRequest(); 3855 namedQueueGetRequest.setNamedQueueEvent(RpcLogDetails.SLOW_LOG_EVENT); 3856 namedQueueGetRequest.setSlowLogResponseRequest(request); 3857 NamedQueueGetResponse namedQueueGetResponse = 3858 namedQueueRecorder.getNamedQueueRecords(namedQueueGetRequest); 3859 slowLogPayloads = namedQueueGetResponse != null ? 3860 namedQueueGetResponse.getSlowLogPayloads() : 3861 Collections.emptyList(); 3862 return slowLogPayloads; 3863 } 3864 3865 @Override 3866 @QosPriority(priority = HConstants.ADMIN_QOS) 3867 public SlowLogResponses getLargeLogResponses(final RpcController controller, 3868 final SlowLogResponseRequest request) { 3869 final NamedQueueRecorder namedQueueRecorder = 3870 this.regionServer.getNamedQueueRecorder(); 3871 final List<SlowLogPayload> slowLogPayloads = getSlowLogPayloads(request, namedQueueRecorder); 3872 SlowLogResponses slowLogResponses = SlowLogResponses.newBuilder() 3873 .addAllSlowLogPayloads(slowLogPayloads) 3874 .build(); 3875 return slowLogResponses; 3876 } 3877 3878 @Override 3879 @QosPriority(priority = HConstants.ADMIN_QOS) 3880 public ClearSlowLogResponses clearSlowLogsResponses(final RpcController controller, 3881 final ClearSlowLogResponseRequest request) { 3882 final NamedQueueRecorder namedQueueRecorder = 3883 this.regionServer.getNamedQueueRecorder(); 3884 boolean slowLogsCleaned = Optional.ofNullable(namedQueueRecorder) 3885 .map(queueRecorder -> 3886 queueRecorder.clearNamedQueue(NamedQueuePayload.NamedQueueEvent.SLOW_LOG)) 3887 .orElse(false); 3888 ClearSlowLogResponses clearSlowLogResponses = ClearSlowLogResponses.newBuilder() 3889 .setIsCleaned(slowLogsCleaned) 3890 .build(); 3891 return clearSlowLogResponses; 3892 } 3893 3894 @Override 3895 public HBaseProtos.LogEntry getLogEntries(RpcController controller, 3896 HBaseProtos.LogRequest request) throws ServiceException { 3897 try { 3898 final String logClassName = request.getLogClassName(); 3899 Class<?> logClass = Class.forName(logClassName) 3900 .asSubclass(Message.class); 3901 Method method = logClass.getMethod("parseFrom", ByteString.class); 3902 if (logClassName.contains("SlowLogResponseRequest")) { 3903 SlowLogResponseRequest slowLogResponseRequest = 3904 (SlowLogResponseRequest) method.invoke(null, request.getLogMessage()); 3905 final NamedQueueRecorder namedQueueRecorder = 3906 this.regionServer.getNamedQueueRecorder(); 3907 final List<SlowLogPayload> slowLogPayloads = 3908 getSlowLogPayloads(slowLogResponseRequest, namedQueueRecorder); 3909 SlowLogResponses slowLogResponses = SlowLogResponses.newBuilder() 3910 .addAllSlowLogPayloads(slowLogPayloads) 3911 .build(); 3912 return HBaseProtos.LogEntry.newBuilder() 3913 .setLogClassName(slowLogResponses.getClass().getName()) 3914 .setLogMessage(slowLogResponses.toByteString()).build(); 3915 } 3916 } catch (ClassNotFoundException | NoSuchMethodException | IllegalAccessException 3917 | InvocationTargetException e) { 3918 LOG.error("Error while retrieving log entries.", e); 3919 throw new ServiceException(e); 3920 } 3921 throw new ServiceException("Invalid request params"); 3922 } 3923 3924 public RpcScheduler getRpcScheduler() { 3925 return rpcServer.getScheduler(); 3926 } 3927 3928 protected AccessChecker getAccessChecker() { 3929 return accessChecker; 3930 } 3931 3932 protected ZKPermissionWatcher getZkPermissionWatcher() { 3933 return zkPermissionWatcher; 3934 } 3935}